Whamcloud - gitweb
* vibnal fixes
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46
47         CDEBUG(D_NET, "callback for device %d\n", devid);
48
49         for (i = 0; i < kranal_data.kra_ndevs; i++) {
50
51                 dev = &kranal_data.kra_devices[i];
52                 if (dev->rad_id != devid)
53                         continue;
54
55                 spin_lock_irqsave(&dev->rad_lock, flags);
56
57                 if (!dev->rad_ready) {
58                         dev->rad_ready = 1;
59                         wake_up(&dev->rad_waitq);
60                 }
61
62                 spin_unlock_irqrestore(&dev->rad_lock, flags);
63                 return;
64         }
65
66         CWARN("callback for unknown device %d\n", devid);
67 }
68
69 void
70 kranal_schedule_conn(kra_conn_t *conn)
71 {
72         kra_device_t    *dev = conn->rac_device;
73         unsigned long    flags;
74
75         spin_lock_irqsave(&dev->rad_lock, flags);
76
77         if (!conn->rac_scheduled) {
78                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
79                 conn->rac_scheduled = 1;
80                 list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
81                 wake_up(&dev->rad_waitq);
82         }
83
84         spin_unlock_irqrestore(&dev->rad_lock, flags);
85 }
86
87 kra_tx_t *
88 kranal_get_idle_tx (int may_block)
89 {
90         unsigned long  flags;
91         kra_tx_t      *tx = NULL;
92
93         for (;;) {
94                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
95
96                 /* "normal" descriptor is free */
97                 if (!list_empty(&kranal_data.kra_idle_txs)) {
98                         tx = list_entry(kranal_data.kra_idle_txs.next,
99                                         kra_tx_t, tx_list);
100                         break;
101                 }
102
103                 if (!may_block) {
104                         /* may dip into reserve pool */
105                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
106                                 CERROR("reserved tx desc pool exhausted\n");
107                                 break;
108                         }
109
110                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
111                                         kra_tx_t, tx_list);
112                         break;
113                 }
114
115                 /* block for idle tx */
116                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
117
118                 wait_event(kranal_data.kra_idle_tx_waitq,
119                            !list_empty(&kranal_data.kra_idle_txs));
120         }
121
122         if (tx != NULL) {
123                 list_del(&tx->tx_list);
124
125                 /* Allocate a new completion cookie.  It might not be
126                  * needed, but we've got a lock right now... */
127                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
128
129                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
130                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
131                 LASSERT (tx->tx_conn == NULL);
132                 LASSERT (tx->tx_libmsg[0] == NULL);
133                 LASSERT (tx->tx_libmsg[1] == NULL);
134         }
135
136         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
137
138         return tx;
139 }
140
141 void
142 kranal_init_msg(kra_msg_t *msg, int type)
143 {
144         msg->ram_magic = RANAL_MSG_MAGIC;
145         msg->ram_version = RANAL_MSG_VERSION;
146         msg->ram_type = type;
147         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
148         /* ram_connstamp gets set when FMA is sent */
149 }
150
151 kra_tx_t *
152 kranal_new_tx_msg (int may_block, int type)
153 {
154         kra_tx_t *tx = kranal_get_idle_tx(may_block);
155
156         if (tx == NULL)
157                 return NULL;
158
159         kranal_init_msg(&tx->tx_msg, type);
160         return tx;
161 }
162
163 int
164 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
165                                int offset, int nob)
166
167 {
168         /* For now this is almost identical to kranal_setup_virt_buffer, but we
169          * could "flatten" the payload into a single contiguous buffer ready
170          * for sending direct over an FMA if we ever needed to. */
171
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173         LASSERT (nob >= 0);
174
175         if (nob == 0) {
176                 tx->tx_buffer = NULL;
177         } else {
178                 LASSERT (niov > 0);
179
180                 while (offset >= iov->iov_len) {
181                         offset -= iov->iov_len;
182                         niov--;
183                         iov++;
184                         LASSERT (niov > 0);
185                 }
186
187                 if (nob > iov->iov_len - offset) {
188                         CERROR("Can't handle multiple vaddr fragments\n");
189                         return -EMSGSIZE;
190                 }
191
192                 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
193         }
194
195         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
196         tx->tx_nob = nob;
197         return 0;
198 }
199
200 int
201 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
202                           int offset, int nob)
203
204 {
205         LASSERT (nob > 0);
206         LASSERT (niov > 0);
207         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
208
209         while (offset >= iov->iov_len) {
210                 offset -= iov->iov_len;
211                 niov--;
212                 iov++;
213                 LASSERT (niov > 0);
214         }
215
216         if (nob > iov->iov_len - offset) {
217                 CERROR("Can't handle multiple vaddr fragments\n");
218                 return -EMSGSIZE;
219         }
220
221         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
222         tx->tx_nob = nob;
223         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
224         return 0;
225 }
226
227 int
228 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
229                           int offset, int nob)
230 {
231         RAP_PHYS_REGION *phys = tx->tx_phys;
232         int              resid;
233
234         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
235
236         LASSERT (nob > 0);
237         LASSERT (nkiov > 0);
238         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
239
240         while (offset >= kiov->kiov_len) {
241                 offset -= kiov->kiov_len;
242                 nkiov--;
243                 kiov++;
244                 LASSERT (nkiov > 0);
245         }
246
247         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
248         tx->tx_nob = nob;
249         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
250
251         phys->Address = kranal_page2phys(kiov->kiov_page);
252         phys++;
253
254         resid = nob - (kiov->kiov_len - offset);
255         while (resid > 0) {
256                 kiov++;
257                 nkiov--;
258                 LASSERT (nkiov > 0);
259
260                 if (kiov->kiov_offset != 0 ||
261                     ((resid > PAGE_SIZE) &&
262                      kiov->kiov_len < PAGE_SIZE)) {
263                         /* Can't have gaps */
264                         CERROR("Can't make payload contiguous in I/O VM:"
265                                "page %d, offset %d, len %d \n",
266                                (int)(phys - tx->tx_phys),
267                                kiov->kiov_offset, kiov->kiov_len);
268                         return -EINVAL;
269                 }
270
271                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
272                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
273                         return -EMSGSIZE;
274                 }
275
276                 phys->Address = kranal_page2phys(kiov->kiov_page);
277                 phys++;
278
279                 resid -= PAGE_SIZE;
280         }
281
282         tx->tx_phys_npages = phys - tx->tx_phys;
283         return 0;
284 }
285
286 static inline int
287 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
288                           struct iovec *iov, ptl_kiov_t *kiov,
289                           int offset, int nob)
290 {
291         LASSERT ((iov == NULL) != (kiov == NULL));
292
293         if (kiov != NULL)
294                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
295
296         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
297 }
298
299 void
300 kranal_map_buffer (kra_tx_t *tx)
301 {
302         kra_conn_t     *conn = tx->tx_conn;
303         kra_device_t   *dev = conn->rac_device;
304         RAP_RETURN      rrc;
305
306         LASSERT (current == dev->rad_scheduler);
307
308         switch (tx->tx_buftype) {
309         default:
310                 LBUG();
311
312         case RANAL_BUF_NONE:
313         case RANAL_BUF_IMMEDIATE:
314         case RANAL_BUF_PHYS_MAPPED:
315         case RANAL_BUF_VIRT_MAPPED:
316                 break;
317
318         case RANAL_BUF_PHYS_UNMAPPED:
319                 rrc = RapkRegisterPhys(dev->rad_handle,
320                                        tx->tx_phys, tx->tx_phys_npages,
321                                        &tx->tx_map_key);
322                 LASSERT (rrc == RAP_SUCCESS);
323                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
324                 break;
325
326         case RANAL_BUF_VIRT_UNMAPPED:
327                 rrc = RapkRegisterMemory(dev->rad_handle,
328                                          tx->tx_buffer, tx->tx_nob,
329                                          &tx->tx_map_key);
330                 LASSERT (rrc == RAP_SUCCESS);
331                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
332                 break;
333         }
334 }
335
336 void
337 kranal_unmap_buffer (kra_tx_t *tx)
338 {
339         kra_device_t   *dev;
340         RAP_RETURN      rrc;
341
342         switch (tx->tx_buftype) {
343         default:
344                 LBUG();
345
346         case RANAL_BUF_NONE:
347         case RANAL_BUF_IMMEDIATE:
348         case RANAL_BUF_PHYS_UNMAPPED:
349         case RANAL_BUF_VIRT_UNMAPPED:
350                 break;
351
352         case RANAL_BUF_PHYS_MAPPED:
353                 LASSERT (tx->tx_conn != NULL);
354                 dev = tx->tx_conn->rac_device;
355                 LASSERT (current == dev->rad_scheduler);
356                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
357                                            &tx->tx_map_key);
358                 LASSERT (rrc == RAP_SUCCESS);
359                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
360                 break;
361
362         case RANAL_BUF_VIRT_MAPPED:
363                 LASSERT (tx->tx_conn != NULL);
364                 dev = tx->tx_conn->rac_device;
365                 LASSERT (current == dev->rad_scheduler);
366                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
367                                            &tx->tx_map_key);
368                 LASSERT (rrc == RAP_SUCCESS);
369                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
370                 break;
371         }
372 }
373
374 void
375 kranal_tx_done (kra_tx_t *tx, int completion)
376 {
377         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
378         unsigned long    flags;
379         int              i;
380
381         LASSERT (!in_interrupt());
382
383         kranal_unmap_buffer(tx);
384
385         for (i = 0; i < 2; i++) {
386                 /* tx may have up to 2 libmsgs to finalise */
387                 if (tx->tx_libmsg[i] == NULL)
388                         continue;
389
390                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
391                 tx->tx_libmsg[i] = NULL;
392         }
393
394         tx->tx_buftype = RANAL_BUF_NONE;
395         tx->tx_msg.ram_type = RANAL_MSG_NONE;
396         tx->tx_conn = NULL;
397
398         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
399
400         if (tx->tx_isnblk) {
401                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
402         } else {
403                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
404                 wake_up(&kranal_data.kra_idle_tx_waitq);
405         }
406
407         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
408 }
409
410 kra_conn_t *
411 kranal_find_conn_locked (kra_peer_t *peer)
412 {
413         struct list_head *tmp;
414
415         /* just return the first connection */
416         list_for_each (tmp, &peer->rap_conns) {
417                 return list_entry(tmp, kra_conn_t, rac_list);
418         }
419
420         return NULL;
421 }
422
423 void
424 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
425 {
426         unsigned long    flags;
427
428         tx->tx_conn = conn;
429
430         spin_lock_irqsave(&conn->rac_lock, flags);
431         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
432         tx->tx_qtime = jiffies;
433         spin_unlock_irqrestore(&conn->rac_lock, flags);
434
435         kranal_schedule_conn(conn);
436 }
437
438 void
439 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
440 {
441         unsigned long    flags;
442         kra_peer_t      *peer;
443         kra_conn_t      *conn;
444         unsigned long    now;
445         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
446
447         /* If I get here, I've committed to send, so I complete the tx with
448          * failure on any problems */
449
450         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
451
452         read_lock(g_lock);
453
454         peer = kranal_find_peer_locked(nid);
455         if (peer == NULL) {
456                 read_unlock(g_lock);
457                 kranal_tx_done(tx, -EHOSTUNREACH);
458                 return;
459         }
460
461         conn = kranal_find_conn_locked(peer);
462         if (conn != NULL) {
463                 kranal_post_fma(conn, tx);
464                 read_unlock(g_lock);
465                 return;
466         }
467
468         /* Making one or more connections; I'll need a write lock... */
469         read_unlock(g_lock);
470         write_lock_irqsave(g_lock, flags);
471
472         peer = kranal_find_peer_locked(nid);
473         if (peer == NULL) {
474                 write_unlock_irqrestore(g_lock, flags);
475                 kranal_tx_done(tx, -EHOSTUNREACH);
476                 return;
477         }
478
479         conn = kranal_find_conn_locked(peer);
480         if (conn != NULL) {
481                 /* Connection exists; queue message on it */
482                 kranal_post_fma(conn, tx);
483                 write_unlock_irqrestore(g_lock, flags);
484                 return;
485         }
486
487         LASSERT (peer->rap_persistence > 0);
488
489         if (!peer->rap_connecting) {
490                 LASSERT (list_empty(&peer->rap_tx_queue));
491
492                 now = CURRENT_SECONDS;
493                 if (now < peer->rap_reconnect_time) {
494                         write_unlock_irqrestore(g_lock, flags);
495                         kranal_tx_done(tx, -EHOSTUNREACH);
496                         return;
497                 }
498
499                 peer->rap_connecting = 1;
500                 kranal_peer_addref(peer); /* extra ref for connd */
501
502                 spin_lock(&kranal_data.kra_connd_lock);
503
504                 list_add_tail(&peer->rap_connd_list,
505                               &kranal_data.kra_connd_peers);
506                 wake_up(&kranal_data.kra_connd_waitq);
507
508                 spin_unlock(&kranal_data.kra_connd_lock);
509         }
510
511         /* A connection is being established; queue the message... */
512         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
513
514         write_unlock_irqrestore(g_lock, flags);
515 }
516
517 void
518 kranal_rdma(kra_tx_t *tx, int type,
519             kra_rdma_desc_t *sink, int nob, __u64 cookie)
520 {
521         kra_conn_t   *conn = tx->tx_conn;
522         RAP_RETURN    rrc;
523         unsigned long flags;
524
525         LASSERT (kranal_tx_mapped(tx));
526         LASSERT (nob <= sink->rard_nob);
527         LASSERT (nob <= tx->tx_nob);
528
529         /* No actual race with scheduler sending CLOSE (I'm she!) */
530         LASSERT (current == conn->rac_device->rad_scheduler);
531
532         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
533         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
534         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
535         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
536         tx->tx_rdma_desc.DstKey = sink->rard_key;
537         tx->tx_rdma_desc.Length = nob;
538         tx->tx_rdma_desc.AppPtr = tx;
539
540         /* prep final completion message */
541         kranal_init_msg(&tx->tx_msg, type);
542         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
543
544         if (nob == 0) { /* Immediate completion */
545                 kranal_post_fma(conn, tx);
546                 return;
547         }
548
549         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
550
551         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
552         LASSERT (rrc == RAP_SUCCESS);
553
554         spin_lock_irqsave(&conn->rac_lock, flags);
555         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
556         tx->tx_qtime = jiffies;
557         spin_unlock_irqrestore(&conn->rac_lock, flags);
558 }
559
560 int
561 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
562 {
563         __u32      nob_received = nob;
564         RAP_RETURN rrc;
565
566         LASSERT (conn->rac_rxmsg != NULL);
567         CDEBUG(D_NET, "Consuming %p\n", conn);
568
569         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
570                              &nob_received, sizeof(kra_msg_t));
571         LASSERT (rrc == RAP_SUCCESS);
572
573         conn->rac_rxmsg = NULL;
574
575         if (nob_received < nob) {
576                 CWARN("Incomplete immediate msg from "LPX64
577                       ": expected %d, got %d\n",
578                       conn->rac_peer->rap_nid, nob, nob_received);
579                 return -EPROTO;
580         }
581
582         return 0;
583 }
584
585 ptl_err_t
586 kranal_do_send (lib_nal_t    *nal,
587                 void         *private,
588                 lib_msg_t    *libmsg,
589                 ptl_hdr_t    *hdr,
590                 int           type,
591                 ptl_nid_t     nid,
592                 ptl_pid_t     pid,
593                 unsigned int  niov,
594                 struct iovec *iov,
595                 ptl_kiov_t   *kiov,
596                 int           offset,
597                 int           nob)
598 {
599         kra_conn_t *conn;
600         kra_tx_t   *tx;
601         int         rc;
602
603         /* NB 'private' is different depending on what we're sending.... */
604
605         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
606                nob, niov, nid, pid);
607
608         LASSERT (nob == 0 || niov > 0);
609         LASSERT (niov <= PTL_MD_MAX_IOV);
610
611         LASSERT (!in_interrupt());
612         /* payload is either all vaddrs or all pages */
613         LASSERT (!(kiov != NULL && iov != NULL));
614
615         switch(type) {
616         default:
617                 LBUG();
618
619         case PTL_MSG_REPLY: {
620                 /* reply's 'private' is the conn that received the GET_REQ */
621                 conn = private;
622                 LASSERT (conn->rac_rxmsg != NULL);
623
624                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
625                         if (nob > RANAL_FMA_MAX_DATA) {
626                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
627                                        nob, nid);
628                                 return PTL_FAIL;
629                         }
630                         break;                  /* RDMA not expected */
631                 }
632
633                 /* Incoming message consistent with RDMA? */
634                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
635                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
636                                nid, conn->rac_rxmsg->ram_type);
637                         return PTL_FAIL;
638                 }
639
640                 tx = kranal_get_idle_tx(0);
641                 if (tx == NULL)
642                         return PTL_FAIL;
643
644                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
645                 if (rc != 0) {
646                         kranal_tx_done(tx, rc);
647                         return PTL_FAIL;
648                 }
649
650                 tx->tx_conn = conn;
651                 tx->tx_libmsg[0] = libmsg;
652
653                 kranal_map_buffer(tx);
654                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
655                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
656                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
657
658                 /* flag matched by consuming rx message */
659                 kranal_consume_rxmsg(conn, NULL, 0);
660                 return PTL_OK;
661         }
662
663         case PTL_MSG_GET:
664                 LASSERT (niov == 0);
665                 LASSERT (nob == 0);
666                 /* We have to consider the eventual sink buffer rather than any
667                  * payload passed here (there isn't any, and strictly, looking
668                  * inside libmsg is a layering violation).  We send a simple
669                  * IMMEDIATE GET if the sink buffer is mapped already and small
670                  * enough for FMA */
671
672                 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
673                     libmsg->md->length <= RANAL_FMA_MAX_DATA &&
674                     libmsg->md->length <= kranal_tunables.kra_max_immediate)
675                         break;
676
677                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
678                 if (tx == NULL)
679                         return PTL_NO_SPACE;
680
681                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
682                         rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
683                                                       libmsg->md->md_iov.iov,
684                                                       0, libmsg->md->length);
685                 else
686                         rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
687                                                       libmsg->md->md_iov.kiov,
688                                                       0, libmsg->md->length);
689                 if (rc != 0) {
690                         kranal_tx_done(tx, rc);
691                         return PTL_FAIL;
692                 }
693
694                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
695                 if (tx->tx_libmsg[1] == NULL) {
696                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
697                         kranal_tx_done(tx, rc);
698                         return PTL_FAIL;
699                 }
700
701                 tx->tx_libmsg[0] = libmsg;
702                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
703                 /* rest of tx_msg is setup just before it is sent */
704                 kranal_launch_tx(tx, nid);
705                 return PTL_OK;
706
707         case PTL_MSG_ACK:
708                 LASSERT (nob == 0);
709                 break;
710
711         case PTL_MSG_PUT:
712                 if (kiov == NULL &&             /* not paged */
713                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
714                     nob <= kranal_tunables.kra_max_immediate)
715                         break;                  /* send IMMEDIATE */
716
717                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
718                 if (tx == NULL)
719                         return PTL_NO_SPACE;
720
721                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
722                 if (rc != 0) {
723                         kranal_tx_done(tx, rc);
724                         return PTL_FAIL;
725                 }
726
727                 tx->tx_libmsg[0] = libmsg;
728                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
729                 /* rest of tx_msg is setup just before it is sent */
730                 kranal_launch_tx(tx, nid);
731                 return PTL_OK;
732         }
733
734         LASSERT (kiov == NULL);
735         LASSERT (nob <= RANAL_FMA_MAX_DATA);
736
737         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
738                                  type == PTL_MSG_REPLY ||
739                                  in_interrupt()),
740                                RANAL_MSG_IMMEDIATE);
741         if (tx == NULL)
742                 return PTL_NO_SPACE;
743
744         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
745         if (rc != 0) {
746                 kranal_tx_done(tx, rc);
747                 return PTL_FAIL;
748         }
749
750         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
751         tx->tx_libmsg[0] = libmsg;
752         kranal_launch_tx(tx, nid);
753         return PTL_OK;
754 }
755
756 ptl_err_t
757 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
758              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
759              unsigned int niov, struct iovec *iov,
760              size_t offset, size_t len)
761 {
762         return kranal_do_send(nal, private, cookie,
763                               hdr, type, nid, pid,
764                               niov, iov, NULL,
765                               offset, len);
766 }
767
768 ptl_err_t
769 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
770                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
771                    unsigned int niov, ptl_kiov_t *kiov,
772                    size_t offset, size_t len)
773 {
774         return kranal_do_send(nal, private, cookie,
775                               hdr, type, nid, pid,
776                               niov, NULL, kiov,
777                               offset, len);
778 }
779
780 ptl_err_t
781 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
782                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
783                 int offset, int mlen, int rlen)
784 {
785         kra_conn_t  *conn = private;
786         kra_msg_t   *rxmsg = conn->rac_rxmsg;
787         kra_tx_t    *tx;
788         void        *buffer;
789         int          rc;
790
791         LASSERT (mlen <= rlen);
792         LASSERT (!in_interrupt());
793         /* Either all pages or all vaddrs */
794         LASSERT (!(kiov != NULL && iov != NULL));
795
796         CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
797
798         if (libmsg == NULL) {
799                 /* GET or ACK or portals is discarding */
800                 LASSERT (mlen == 0);
801                 lib_finalize(nal, NULL, libmsg, PTL_OK);
802                 return PTL_OK;
803         }
804
805         switch(rxmsg->ram_type) {
806         default:
807                 LBUG();
808                 return PTL_FAIL;
809
810         case RANAL_MSG_IMMEDIATE:
811                 if (mlen == 0) {
812                         buffer = NULL;
813                 } else if (kiov != NULL) {
814                         CERROR("Can't recv immediate into paged buffer\n");
815                         return PTL_FAIL;
816                 } else {
817                         LASSERT (niov > 0);
818                         while (offset >= iov->iov_len) {
819                                 offset -= iov->iov_len;
820                                 iov++;
821                                 niov--;
822                                 LASSERT (niov > 0);
823                         }
824                         if (mlen > iov->iov_len - offset) {
825                                 CERROR("Can't handle immediate frags\n");
826                                 return PTL_FAIL;
827                         }
828                         buffer = ((char *)iov->iov_base) + offset;
829                 }
830                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
831                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
832                 return PTL_OK;
833
834         case RANAL_MSG_PUT_REQ:
835                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
836                 if (tx == NULL)
837                         return PTL_NO_SPACE;
838
839                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
840                 if (rc != 0) {
841                         kranal_tx_done(tx, rc);
842                         return PTL_FAIL;
843                 }
844
845                 tx->tx_conn = conn;
846                 kranal_map_buffer(tx);
847
848                 tx->tx_msg.ram_u.putack.rapam_src_cookie =
849                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
850                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
851                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
852                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
853                         (__u64)((unsigned long)tx->tx_buffer);
854                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
855
856                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
857
858                 kranal_post_fma(conn, tx);
859
860                 /* flag matched by consuming rx message */
861                 kranal_consume_rxmsg(conn, NULL, 0);
862                 return PTL_OK;
863         }
864 }
865
866 ptl_err_t
867 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
868              unsigned int niov, struct iovec *iov,
869              size_t offset, size_t mlen, size_t rlen)
870 {
871         return kranal_do_recv(nal, private, msg, niov, iov, NULL,
872                               offset, mlen, rlen);
873 }
874
875 ptl_err_t
876 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
877                    unsigned int niov, ptl_kiov_t *kiov,
878                    size_t offset, size_t mlen, size_t rlen)
879 {
880         return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
881                               offset, mlen, rlen);
882 }
883
884 int
885 kranal_thread_start (int(*fn)(void *arg), void *arg)
886 {
887         long    pid = kernel_thread(fn, arg, 0);
888
889         if (pid < 0)
890                 return(int)pid;
891
892         atomic_inc(&kranal_data.kra_nthreads);
893         return 0;
894 }
895
896 void
897 kranal_thread_fini (void)
898 {
899         atomic_dec(&kranal_data.kra_nthreads);
900 }
901
902 int
903 kranal_check_conn_timeouts (kra_conn_t *conn)
904 {
905         kra_tx_t          *tx;
906         struct list_head  *ttmp;
907         unsigned long      flags;
908         long               timeout;
909         unsigned long      now = jiffies;
910
911         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
912                  conn->rac_state == RANAL_CONN_CLOSING);
913
914         if (!conn->rac_close_sent &&
915             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
916                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
917                 CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
918                        conn, conn->rac_peer->rap_nid);
919                 kranal_schedule_conn(conn);
920         }
921
922         timeout = conn->rac_timeout * HZ;
923
924         if (!conn->rac_close_recvd &&
925             time_after_eq(now, conn->rac_last_rx + timeout)) {
926                 CERROR("%s received from "LPX64" within %lu seconds\n",
927                        (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
928                        "Nothing" : "CLOSE not",
929                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
930                 return -ETIMEDOUT;
931         }
932
933         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
934                 return 0;
935
936         /* Check the conn's queues are moving.  These are "belt+braces" checks,
937          * in case of hardware/software errors that make this conn seem
938          * responsive even though it isn't progressing its message queues. */
939
940         spin_lock_irqsave(&conn->rac_lock, flags);
941
942         list_for_each (ttmp, &conn->rac_fmaq) {
943                 tx = list_entry(ttmp, kra_tx_t, tx_list);
944
945                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
946                         spin_unlock_irqrestore(&conn->rac_lock, flags);
947                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
948                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
949                         return -ETIMEDOUT;
950                 }
951         }
952
953         list_for_each (ttmp, &conn->rac_rdmaq) {
954                 tx = list_entry(ttmp, kra_tx_t, tx_list);
955
956                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
957                         spin_unlock_irqrestore(&conn->rac_lock, flags);
958                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
959                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
960                         return -ETIMEDOUT;
961                 }
962         }
963
964         list_for_each (ttmp, &conn->rac_replyq) {
965                 tx = list_entry(ttmp, kra_tx_t, tx_list);
966
967                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
968                         spin_unlock_irqrestore(&conn->rac_lock, flags);
969                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
970                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
971                         return -ETIMEDOUT;
972                 }
973         }
974
975         spin_unlock_irqrestore(&conn->rac_lock, flags);
976         return 0;
977 }
978
979 void
980 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
981 {
982         struct list_head  *conns = &kranal_data.kra_conns[idx];
983         struct list_head  *ctmp;
984         kra_conn_t        *conn;
985         unsigned long      flags;
986         int                rc;
987
988  again:
989         /* NB. We expect to check all the conns and not find any problems, so
990          * we just use a shared lock while we take a look... */
991         read_lock(&kranal_data.kra_global_lock);
992
993         list_for_each (ctmp, conns) {
994                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
995
996                 if (conn->rac_timeout < *min_timeoutp )
997                         *min_timeoutp = conn->rac_timeout;
998                 if (conn->rac_keepalive < *min_timeoutp )
999                         *min_timeoutp = conn->rac_keepalive;
1000
1001                 rc = kranal_check_conn_timeouts(conn);
1002                 if (rc == 0)
1003                         continue;
1004
1005                 kranal_conn_addref(conn);
1006                 read_unlock(&kranal_data.kra_global_lock);
1007
1008                 CERROR("Conn to "LPX64", cqid %d timed out\n",
1009                        conn->rac_peer->rap_nid, conn->rac_cqid);
1010
1011                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1012
1013                 switch (conn->rac_state) {
1014                 default:
1015                         LBUG();
1016
1017                 case RANAL_CONN_ESTABLISHED:
1018                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1019                         break;
1020
1021                 case RANAL_CONN_CLOSING:
1022                         kranal_terminate_conn_locked(conn);
1023                         break;
1024                 }
1025
1026                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1027
1028                 kranal_conn_decref(conn);
1029
1030                 /* start again now I've dropped the lock */
1031                 goto again;
1032         }
1033
1034         read_unlock(&kranal_data.kra_global_lock);
1035 }
1036
1037 int
1038 kranal_connd (void *arg)
1039 {
1040         long               id = (long)arg;
1041         char               name[16];
1042         wait_queue_t       wait;
1043         unsigned long      flags;
1044         kra_peer_t        *peer;
1045         kra_acceptsock_t  *ras;
1046         int                did_something;
1047
1048         snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1049         kportal_daemonize(name);
1050         kportal_blockallsigs();
1051
1052         init_waitqueue_entry(&wait, current);
1053
1054         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1055
1056         while (!kranal_data.kra_shutdown) {
1057                 did_something = 0;
1058
1059                 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1060                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
1061                                          kra_acceptsock_t, ras_list);
1062                         list_del(&ras->ras_list);
1063
1064                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1065
1066                         CDEBUG(D_NET,"About to handshake someone\n");
1067
1068                         kranal_conn_handshake(ras->ras_sock, NULL);
1069                         kranal_free_acceptsock(ras);
1070
1071                         CDEBUG(D_NET,"Finished handshaking someone\n");
1072
1073                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1074                         did_something = 1;
1075                 }
1076
1077                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1078                         peer = list_entry(kranal_data.kra_connd_peers.next,
1079                                           kra_peer_t, rap_connd_list);
1080
1081                         list_del_init(&peer->rap_connd_list);
1082                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1083
1084                         kranal_connect(peer);
1085                         kranal_peer_decref(peer);
1086
1087                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1088                         did_something = 1;
1089                 }
1090
1091                 if (did_something)
1092                         continue;
1093
1094                 set_current_state(TASK_INTERRUPTIBLE);
1095                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1096
1097                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1098
1099                 schedule ();
1100
1101                 set_current_state(TASK_RUNNING);
1102                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1103
1104                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1105         }
1106
1107         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1108
1109         kranal_thread_fini();
1110         return 0;
1111 }
1112
1113 void
1114 kranal_update_reaper_timeout(long timeout)
1115 {
1116         unsigned long   flags;
1117
1118         LASSERT (timeout > 0);
1119
1120         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1121
1122         if (timeout < kranal_data.kra_new_min_timeout)
1123                 kranal_data.kra_new_min_timeout = timeout;
1124
1125         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1126 }
1127
1128 int
1129 kranal_reaper (void *arg)
1130 {
1131         wait_queue_t       wait;
1132         unsigned long      flags;
1133         long               timeout;
1134         int                i;
1135         int                conn_entries = kranal_data.kra_conn_hash_size;
1136         int                conn_index = 0;
1137         int                base_index = conn_entries - 1;
1138         unsigned long      next_check_time = jiffies;
1139         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1140         long               current_min_timeout = 1;
1141
1142         kportal_daemonize("kranal_reaper");
1143         kportal_blockallsigs();
1144
1145         init_waitqueue_entry(&wait, current);
1146
1147         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1148
1149         while (!kranal_data.kra_shutdown) {
1150                 /* I wake up every 'p' seconds to check for timeouts on some
1151                  * more peers.  I try to check every connection 'n' times
1152                  * within the global minimum of all keepalive and timeout
1153                  * intervals, to ensure I attend to every connection within
1154                  * (n+1)/n times its timeout intervals. */
1155                 const int     p = 1;
1156                 const int     n = 3;
1157                 unsigned long min_timeout;
1158                 int           chunk;
1159
1160                 /* careful with the jiffy wrap... */
1161                 timeout = (long)(next_check_time - jiffies);
1162                 if (timeout > 0) {
1163                         set_current_state(TASK_INTERRUPTIBLE);
1164                         add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1165
1166                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1167
1168                         schedule_timeout(timeout);
1169
1170                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1171
1172                         set_current_state(TASK_RUNNING);
1173                         remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1174                         continue;
1175                 }
1176
1177                 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1178                         /* new min timeout set: restart min timeout scan */
1179                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1180                         base_index = conn_index - 1;
1181                         if (base_index < 0)
1182                                 base_index = conn_entries - 1;
1183
1184                         if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1185                                 current_min_timeout = kranal_data.kra_new_min_timeout;
1186                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1187                                        current_min_timeout);
1188                         }
1189
1190                         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1191                 }
1192                 min_timeout = current_min_timeout;
1193
1194                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1195
1196                 LASSERT (min_timeout > 0);
1197
1198                 /* Compute how many table entries to check now so I get round
1199                  * the whole table fast enough given that I do this at fixed
1200                  * intervals of 'p' seconds) */
1201                 chunk = conn_entries;
1202                 if (min_timeout > n * p)
1203                         chunk = (chunk * n * p) / min_timeout;
1204                 if (chunk == 0)
1205                         chunk = 1;
1206
1207                 for (i = 0; i < chunk; i++) {
1208                         kranal_reaper_check(conn_index,
1209                                             &next_min_timeout);
1210                         conn_index = (conn_index + 1) % conn_entries;
1211                 }
1212
1213                 next_check_time += p * HZ;
1214
1215                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1216
1217                 if (((conn_index - chunk <= base_index &&
1218                       base_index < conn_index) ||
1219                      (conn_index - conn_entries - chunk <= base_index &&
1220                       base_index < conn_index - conn_entries))) {
1221
1222                         /* Scanned all conns: set current_min_timeout... */
1223                         if (current_min_timeout != next_min_timeout) {
1224                                 current_min_timeout = next_min_timeout;
1225                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1226                                        current_min_timeout);
1227                         }
1228
1229                         /* ...and restart min timeout scan */
1230                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1231                         base_index = conn_index - 1;
1232                         if (base_index < 0)
1233                                 base_index = conn_entries - 1;
1234                 }
1235         }
1236
1237         kranal_thread_fini();
1238         return 0;
1239 }
1240
1241 void
1242 kranal_check_rdma_cq (kra_device_t *dev)
1243 {
1244         kra_conn_t          *conn;
1245         kra_tx_t            *tx;
1246         RAP_RETURN           rrc;
1247         unsigned long        flags;
1248         RAP_RDMA_DESCRIPTOR *desc;
1249         __u32                cqid;
1250         __u32                event_type;
1251
1252         for (;;) {
1253                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1254                 if (rrc == RAP_NOT_DONE) {
1255                         CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1256                         return;
1257                 }
1258
1259                 LASSERT (rrc == RAP_SUCCESS);
1260                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1261
1262                 read_lock(&kranal_data.kra_global_lock);
1263
1264                 conn = kranal_cqid2conn_locked(cqid);
1265                 if (conn == NULL) {
1266                         /* Conn was destroyed? */
1267                         CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1268                         read_unlock(&kranal_data.kra_global_lock);
1269                         continue;
1270                 }
1271
1272                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1273                 LASSERT (rrc == RAP_SUCCESS);
1274
1275                 CDEBUG(D_NET, "Completed %p\n",
1276                        list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1277
1278                 spin_lock_irqsave(&conn->rac_lock, flags);
1279
1280                 LASSERT (!list_empty(&conn->rac_rdmaq));
1281                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1282                 list_del(&tx->tx_list);
1283
1284                 LASSERT(desc->AppPtr == (void *)tx);
1285                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1286                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1287
1288                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1289                 tx->tx_qtime = jiffies;
1290
1291                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1292
1293                 /* Get conn's fmaq processed, now I've just put something
1294                  * there */
1295                 kranal_schedule_conn(conn);
1296
1297                 read_unlock(&kranal_data.kra_global_lock);
1298         }
1299 }
1300
1301 void
1302 kranal_check_fma_cq (kra_device_t *dev)
1303 {
1304         kra_conn_t         *conn;
1305         RAP_RETURN          rrc;
1306         __u32               cqid;
1307         __u32               event_type;
1308         struct list_head   *conns;
1309         struct list_head   *tmp;
1310         int                 i;
1311
1312         for (;;) {
1313                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1314                 if (rrc == RAP_NOT_DONE) {
1315                         CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1316                         return;
1317                 }
1318
1319                 LASSERT (rrc == RAP_SUCCESS);
1320
1321                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1322
1323                         read_lock(&kranal_data.kra_global_lock);
1324
1325                         conn = kranal_cqid2conn_locked(cqid);
1326                         if (conn == NULL) {
1327                                 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1328                                        cqid);
1329                         } else {
1330                                 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1331                                        conn, cqid);
1332                                 kranal_schedule_conn(conn);
1333                         }
1334
1335                         read_unlock(&kranal_data.kra_global_lock);
1336                         continue;
1337                 }
1338
1339                 /* FMA CQ has overflowed: check ALL conns */
1340                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1341
1342                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1343
1344                         read_lock(&kranal_data.kra_global_lock);
1345
1346                         conns = &kranal_data.kra_conns[i];
1347
1348                         list_for_each (tmp, conns) {
1349                                 conn = list_entry(tmp, kra_conn_t,
1350                                                   rac_hashlist);
1351
1352                                 if (conn->rac_device == dev)
1353                                         kranal_schedule_conn(conn);
1354                         }
1355
1356                         /* don't block write lockers for too long... */
1357                         read_unlock(&kranal_data.kra_global_lock);
1358                 }
1359         }
1360 }
1361
1362 int
1363 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1364                void *immediate, int immediatenob)
1365 {
1366         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1367         RAP_RETURN rrc;
1368
1369         CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1370                conn, msg, msg->ram_type, sync ? "(sync)" : "",
1371                immediate, immediatenob);
1372
1373         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1374         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1375                  immediatenob <= RANAL_FMA_MAX_DATA :
1376                  immediatenob == 0);
1377
1378         msg->ram_connstamp = conn->rac_my_connstamp;
1379         msg->ram_seq = conn->rac_tx_seq;
1380
1381         if (sync)
1382                 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1383                                       immediate, immediatenob,
1384                                       msg, sizeof(*msg));
1385         else
1386                 rrc = RapkFmaSend(conn->rac_rihandle,
1387                                   immediate, immediatenob,
1388                                   msg, sizeof(*msg));
1389
1390         switch (rrc) {
1391         default:
1392                 LBUG();
1393
1394         case RAP_SUCCESS:
1395                 conn->rac_last_tx = jiffies;
1396                 conn->rac_tx_seq++;
1397                 return 0;
1398
1399         case RAP_NOT_DONE:
1400                 if (time_after_eq(jiffies,
1401                                   conn->rac_last_tx + conn->rac_keepalive*HZ))
1402                         CDEBUG(D_WARNING, "EAGAIN sending %02x (idle %lu secs)\n",
1403                                msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
1404                 return -EAGAIN;
1405         }
1406 }
1407
1408 void
1409 kranal_process_fmaq (kra_conn_t *conn)
1410 {
1411         unsigned long flags;
1412         int           more_to_do;
1413         kra_tx_t     *tx;
1414         int           rc;
1415         int           expect_reply;
1416
1417         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1418          *       However I will be rescheduled some by an FMA completion event
1419          *       when I eventually get some.
1420          * NB 2. Sampling rac_state here races with setting it elsewhere.
1421          *       But it doesn't matter if I try to send a "real" message just
1422          *       as I start closing because I'll get scheduled to send the
1423          *       close anyway. */
1424
1425         /* Not racing with incoming message processing! */
1426         LASSERT (current == conn->rac_device->rad_scheduler);
1427
1428         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1429                 if (!list_empty(&conn->rac_rdmaq)) {
1430                         /* RDMAs in progress */
1431                         LASSERT (!conn->rac_close_sent);
1432
1433                         if (time_after_eq(jiffies,
1434                                           conn->rac_last_tx +
1435                                           conn->rac_keepalive * HZ)) {
1436                                 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1437                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1438                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1439                         }
1440                         return;
1441                 }
1442
1443                 if (conn->rac_close_sent)
1444                         return;
1445
1446                 CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
1447                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1448                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1449                 if (rc != 0)
1450                         return;
1451
1452                 conn->rac_close_sent = 1;
1453                 if (!conn->rac_close_recvd)
1454                         return;
1455
1456                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1457
1458                 if (conn->rac_state == RANAL_CONN_CLOSING)
1459                         kranal_terminate_conn_locked(conn);
1460
1461                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1462                 return;
1463         }
1464
1465         spin_lock_irqsave(&conn->rac_lock, flags);
1466
1467         if (list_empty(&conn->rac_fmaq)) {
1468
1469                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1470
1471                 if (time_after_eq(jiffies,
1472                                   conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1473                         CDEBUG(D_NET, "sending NOOP -> "LPX64" (%p idle %lu(%ld))\n",
1474                                conn->rac_peer->rap_nid, conn,
1475                                (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
1476                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1477                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1478                 }
1479                 return;
1480         }
1481
1482         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1483         list_del(&tx->tx_list);
1484         more_to_do = !list_empty(&conn->rac_fmaq);
1485
1486         spin_unlock_irqrestore(&conn->rac_lock, flags);
1487
1488         expect_reply = 0;
1489         CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1490                tx, tx->tx_msg.ram_type, tx->tx_cookie);
1491         switch (tx->tx_msg.ram_type) {
1492         default:
1493                 LBUG();
1494
1495         case RANAL_MSG_IMMEDIATE:
1496                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1497                                     tx->tx_buffer, tx->tx_nob);
1498                 expect_reply = 0;
1499                 break;
1500
1501         case RANAL_MSG_PUT_NAK:
1502         case RANAL_MSG_PUT_DONE:
1503         case RANAL_MSG_GET_NAK:
1504         case RANAL_MSG_GET_DONE:
1505                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1506                 expect_reply = 0;
1507                 break;
1508
1509         case RANAL_MSG_PUT_REQ:
1510                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1511                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1512                 kranal_map_buffer(tx);
1513                 expect_reply = 1;
1514                 break;
1515
1516         case RANAL_MSG_PUT_ACK:
1517                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1518                 expect_reply = 1;
1519                 break;
1520
1521         case RANAL_MSG_GET_REQ:
1522                 kranal_map_buffer(tx);
1523                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1524                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1525                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1526                         (__u64)((unsigned long)tx->tx_buffer);
1527                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1528                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1529                 expect_reply = 1;
1530                 break;
1531         }
1532
1533         if (rc == -EAGAIN) {
1534                 /* I need credits to send this.  Replace tx at the head of the
1535                  * fmaq and I'll get rescheduled when credits appear */
1536                 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1537                 spin_lock_irqsave(&conn->rac_lock, flags);
1538                 list_add(&tx->tx_list, &conn->rac_fmaq);
1539                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1540                 return;
1541         }
1542
1543         LASSERT (rc == 0);
1544
1545         if (!expect_reply) {
1546                 kranal_tx_done(tx, 0);
1547         } else {
1548                 /* LASSERT(current) above ensures this doesn't race with reply
1549                  * processing */
1550                 spin_lock_irqsave(&conn->rac_lock, flags);
1551                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1552                 tx->tx_qtime = jiffies;
1553                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1554         }
1555
1556         if (more_to_do) {
1557                 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1558                 kranal_schedule_conn(conn);
1559         }
1560 }
1561
1562 static inline void
1563 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1564 {
1565         __swab64s(&d->rard_key.Key);
1566         __swab16s(&d->rard_key.Cookie);
1567         __swab16s(&d->rard_key.MdHandle);
1568         __swab32s(&d->rard_key.Flags);
1569         __swab64s(&d->rard_addr.AddressBits);
1570         __swab32s(&d->rard_nob);
1571 }
1572
1573 kra_tx_t *
1574 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1575 {
1576         struct list_head *ttmp;
1577         kra_tx_t         *tx;
1578         unsigned long     flags;
1579
1580         spin_lock_irqsave(&conn->rac_lock, flags);
1581
1582         list_for_each(ttmp, &conn->rac_replyq) {
1583                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1584
1585                 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1586                        tx, tx->tx_msg.ram_type, tx->tx_cookie);
1587
1588                 if (tx->tx_cookie != cookie)
1589                         continue;
1590
1591                 if (tx->tx_msg.ram_type != type) {
1592                         spin_unlock_irqrestore(&conn->rac_lock, flags);
1593                         CWARN("Unexpected type %x (%x expected) "
1594                               "matched reply from "LPX64"\n",
1595                               tx->tx_msg.ram_type, type,
1596                               conn->rac_peer->rap_nid);
1597                         return NULL;
1598                 }
1599
1600                 list_del(&tx->tx_list);
1601                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1602                 return tx;
1603         }
1604
1605         spin_unlock_irqrestore(&conn->rac_lock, flags);
1606         CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
1607               type, cookie, conn->rac_peer->rap_nid);
1608         return NULL;
1609 }
1610
1611 void
1612 kranal_check_fma_rx (kra_conn_t *conn)
1613 {
1614         unsigned long flags;
1615         __u32         seq;
1616         kra_tx_t     *tx;
1617         kra_msg_t    *msg;
1618         void         *prefix;
1619         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1620         kra_peer_t   *peer = conn->rac_peer;
1621
1622         if (rrc == RAP_NOT_DONE)
1623                 return;
1624
1625         CDEBUG(D_NET, "RX on %p\n", conn);
1626
1627         LASSERT (rrc == RAP_SUCCESS);
1628         conn->rac_last_rx = jiffies;
1629         seq = conn->rac_rx_seq++;
1630         msg = (kra_msg_t *)prefix;
1631
1632         /* stash message for portals callbacks they'll NULL
1633          * rac_rxmsg if they consume it */
1634         LASSERT (conn->rac_rxmsg == NULL);
1635         conn->rac_rxmsg = msg;
1636
1637         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1638                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1639                         CERROR("Unexpected magic %08x from "LPX64"\n",
1640                                msg->ram_magic, peer->rap_nid);
1641                         goto out;
1642                 }
1643
1644                 __swab32s(&msg->ram_magic);
1645                 __swab16s(&msg->ram_version);
1646                 __swab16s(&msg->ram_type);
1647                 __swab64s(&msg->ram_srcnid);
1648                 __swab64s(&msg->ram_connstamp);
1649                 __swab32s(&msg->ram_seq);
1650
1651                 /* NB message type checked below; NOT here... */
1652                 switch (msg->ram_type) {
1653                 case RANAL_MSG_PUT_ACK:
1654                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1655                         break;
1656
1657                 case RANAL_MSG_GET_REQ:
1658                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1659                         break;
1660
1661                 default:
1662                         break;
1663                 }
1664         }
1665
1666         if (msg->ram_version != RANAL_MSG_VERSION) {
1667                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1668                        msg->ram_version, peer->rap_nid);
1669                 goto out;
1670         }
1671
1672         if (msg->ram_srcnid != peer->rap_nid) {
1673                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1674                        msg->ram_srcnid, peer->rap_nid);
1675                 goto out;
1676         }
1677
1678         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1679                 CERROR("Unexpected connstamp "LPX64"("LPX64
1680                        " expected) from "LPX64"\n",
1681                        msg->ram_connstamp, conn->rac_peer_connstamp,
1682                        peer->rap_nid);
1683                 goto out;
1684         }
1685
1686         if (msg->ram_seq != seq) {
1687                 CERROR("Unexpected sequence number %d(%d expected) from "
1688                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1689                 goto out;
1690         }
1691
1692         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1693                 /* This message signals RDMA completion... */
1694                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1695                 LASSERT (rrc == RAP_SUCCESS);
1696         }
1697
1698         if (conn->rac_close_recvd) {
1699                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1700                        msg->ram_type, conn->rac_peer->rap_nid);
1701                 goto out;
1702         }
1703
1704         if (msg->ram_type == RANAL_MSG_CLOSE) {
1705                 CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
1706                 conn->rac_close_recvd = 1;
1707                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1708
1709                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1710                         kranal_close_conn_locked(conn, 0);
1711                 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1712                          conn->rac_close_sent)
1713                         kranal_terminate_conn_locked(conn);
1714
1715                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1716                 goto out;
1717         }
1718
1719         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1720                 goto out;
1721
1722         switch (msg->ram_type) {
1723         case RANAL_MSG_NOOP:
1724                 /* Nothing to do; just a keepalive */
1725                 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1726                 break;
1727
1728         case RANAL_MSG_IMMEDIATE:
1729                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1730                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1731                 break;
1732
1733         case RANAL_MSG_PUT_REQ:
1734                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1735                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1736
1737                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1738                         break;
1739
1740                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1741                 if (tx == NULL)
1742                         break;
1743
1744                 tx->tx_msg.ram_u.completion.racm_cookie =
1745                         msg->ram_u.putreq.raprm_cookie;
1746                 kranal_post_fma(conn, tx);
1747                 break;
1748
1749         case RANAL_MSG_PUT_NAK:
1750                 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1751                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1752                                         msg->ram_u.completion.racm_cookie);
1753                 if (tx == NULL)
1754                         break;
1755
1756                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1757                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1758                 kranal_tx_done(tx, -ENOENT);    /* no match */
1759                 break;
1760
1761         case RANAL_MSG_PUT_ACK:
1762                 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1763                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1764                                         msg->ram_u.putack.rapam_src_cookie);
1765                 if (tx == NULL)
1766                         break;
1767
1768                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1769                             &msg->ram_u.putack.rapam_desc,
1770                             msg->ram_u.putack.rapam_desc.rard_nob,
1771                             msg->ram_u.putack.rapam_dst_cookie);
1772                 break;
1773
1774         case RANAL_MSG_PUT_DONE:
1775                 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1776                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1777                                         msg->ram_u.completion.racm_cookie);
1778                 if (tx == NULL)
1779                         break;
1780
1781                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1782                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1783                 kranal_tx_done(tx, 0);
1784                 break;
1785
1786         case RANAL_MSG_GET_REQ:
1787                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1788                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1789
1790                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1791                         break;
1792
1793                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1794                 if (tx == NULL)
1795                         break;
1796
1797                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1798                 kranal_post_fma(conn, tx);
1799                 break;
1800
1801         case RANAL_MSG_GET_NAK:
1802                 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1803                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1804                                         msg->ram_u.completion.racm_cookie);
1805                 if (tx == NULL)
1806                         break;
1807
1808                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1809                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1810                 kranal_tx_done(tx, -ENOENT);    /* no match */
1811                 break;
1812
1813         case RANAL_MSG_GET_DONE:
1814                 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1815                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1816                                         msg->ram_u.completion.racm_cookie);
1817                 if (tx == NULL)
1818                         break;
1819
1820                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1821                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1822                 kranal_tx_done(tx, 0);
1823                 break;
1824         }
1825
1826  out:
1827         if (conn->rac_rxmsg != NULL)
1828                 kranal_consume_rxmsg(conn, NULL, 0);
1829
1830         /* check again later */
1831         kranal_schedule_conn(conn);
1832 }
1833
1834 void
1835 kranal_complete_closed_conn (kra_conn_t *conn)
1836 {
1837         kra_tx_t   *tx;
1838         int         nfma;
1839         int         nreplies;
1840
1841         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1842         LASSERT (list_empty(&conn->rac_list));
1843         LASSERT (list_empty(&conn->rac_hashlist));
1844
1845         for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
1846                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1847
1848                 list_del(&tx->tx_list);
1849                 kranal_tx_done(tx, -ECONNABORTED);
1850         }
1851
1852         LASSERT (list_empty(&conn->rac_rdmaq));
1853
1854         for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
1855                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1856
1857                 list_del(&tx->tx_list);
1858                 kranal_tx_done(tx, -ECONNABORTED);
1859         }
1860
1861         CDEBUG(D_WARNING, "Closed conn %p -> "LPX64": nmsg %d nreplies %d\n",
1862                conn, conn->rac_peer->rap_nid, nfma, nreplies);
1863 }
1864
1865 int
1866 kranal_process_new_conn (kra_conn_t *conn)
1867 {
1868         RAP_RETURN   rrc;
1869         
1870         rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1871         if (rrc == RAP_SUCCESS)
1872                 return 0;
1873
1874         LASSERT (rrc == RAP_NOT_DONE);
1875         if (!time_after_eq(jiffies, conn->rac_last_tx + 
1876                            conn->rac_timeout * HZ))
1877                 return -EAGAIN;
1878
1879         /* Too late */
1880         rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1881         LASSERT (rrc == RAP_SUCCESS);
1882         return -ETIMEDOUT;
1883 }
1884
1885 int
1886 kranal_scheduler (void *arg)
1887 {
1888         kra_device_t     *dev = (kra_device_t *)arg;
1889         wait_queue_t      wait;
1890         char              name[16];
1891         kra_conn_t       *conn;
1892         unsigned long     flags;
1893         unsigned long     deadline;
1894         unsigned long     soonest;
1895         int               nsoonest;
1896         long              timeout;
1897         struct list_head *tmp;
1898         struct list_head *nxt;
1899         int               rc;
1900         int               dropped_lock;
1901         int               busy_loops = 0;
1902
1903         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1904         kportal_daemonize(name);
1905         kportal_blockallsigs();
1906
1907         dev->rad_scheduler = current;
1908         init_waitqueue_entry(&wait, current);
1909
1910         spin_lock_irqsave(&dev->rad_lock, flags);
1911
1912         while (!kranal_data.kra_shutdown) {
1913                 /* Safe: kra_shutdown only set when quiescent */
1914
1915                 if (busy_loops++ >= RANAL_RESCHED) {
1916                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1917
1918                         our_cond_resched();
1919                         busy_loops = 0;
1920
1921                         spin_lock_irqsave(&dev->rad_lock, flags);
1922                 }
1923
1924                 dropped_lock = 0;
1925
1926                 if (dev->rad_ready) {
1927                         /* Device callback fired since I last checked it */
1928                         dev->rad_ready = 0;
1929                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1930                         dropped_lock = 1;
1931
1932                         kranal_check_rdma_cq(dev);
1933                         kranal_check_fma_cq(dev);
1934
1935                         spin_lock_irqsave(&dev->rad_lock, flags);
1936                 }
1937
1938                 list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1939                         conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1940
1941                         list_del_init(&conn->rac_schedlist);
1942                         LASSERT (conn->rac_scheduled);
1943                         conn->rac_scheduled = 0;
1944                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1945                         dropped_lock = 1;
1946
1947                         kranal_check_fma_rx(conn);
1948                         kranal_process_fmaq(conn);
1949
1950                         if (conn->rac_state == RANAL_CONN_CLOSED)
1951                                 kranal_complete_closed_conn(conn);
1952
1953                         kranal_conn_decref(conn);
1954                         spin_lock_irqsave(&dev->rad_lock, flags);
1955                 }
1956
1957                 nsoonest = 0;
1958                 soonest = jiffies;
1959
1960                 list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
1961                         conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1962                         
1963                         deadline = conn->rac_last_tx + conn->rac_keepalive;
1964                         if (time_after_eq(jiffies, deadline)) {
1965                                 /* Time to process this new conn */
1966                                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1967                                 dropped_lock = 1;
1968
1969                                 rc = kranal_process_new_conn(conn);
1970                                 if (rc != -EAGAIN) {
1971                                         /* All done with this conn */
1972                                         spin_lock_irqsave(&dev->rad_lock, flags);
1973                                         list_del_init(&conn->rac_schedlist);
1974                                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1975
1976                                         kranal_conn_decref(conn);
1977                                         spin_lock_irqsave(&dev->rad_lock, flags);
1978                                         continue;
1979                                 }
1980
1981                                 /* retry with exponential backoff until HZ */
1982                                 if (conn->rac_keepalive == 0)
1983                                         conn->rac_keepalive = 1;
1984                                 else if (conn->rac_keepalive <= HZ)
1985                                         conn->rac_keepalive *= 2;
1986                                 else
1987                                         conn->rac_keepalive += HZ;
1988                                 
1989                                 deadline = conn->rac_last_tx + conn->rac_keepalive;
1990                                 spin_lock_irqsave(&dev->rad_lock, flags);
1991                         }
1992
1993                         /* Does this conn need attention soonest? */
1994                         if (nsoonest++ == 0 ||
1995                             !time_after_eq(deadline, soonest))
1996                                 soonest = deadline;
1997                 }
1998
1999                 if (dropped_lock)               /* may sleep iff I didn't drop the lock */
2000                         continue;
2001
2002                 set_current_state(TASK_INTERRUPTIBLE);
2003                 add_wait_queue(&dev->rad_waitq, &wait);
2004                 spin_unlock_irqrestore(&dev->rad_lock, flags);
2005
2006                 if (nsoonest == 0) {
2007                         busy_loops = 0;
2008                         schedule();
2009                 } else {
2010                         timeout = (long)(soonest - jiffies);
2011                         if (timeout > 0) {
2012                                 busy_loops = 0;
2013                                 schedule_timeout(timeout);
2014                         }
2015                 }
2016
2017                 remove_wait_queue(&dev->rad_waitq, &wait);
2018                 set_current_state(TASK_RUNNING);
2019                 spin_lock_irqsave(&dev->rad_lock, flags);
2020         }
2021
2022         spin_unlock_irqrestore(&dev->rad_lock, flags);
2023
2024         dev->rad_scheduler = NULL;
2025         kranal_thread_fini();
2026         return 0;
2027 }
2028
2029
2030 lib_nal_t kranal_lib = {
2031         libnal_data:        &kranal_data,      /* NAL private data */
2032         libnal_send:         kranal_send,
2033         libnal_send_pages:   kranal_send_pages,
2034         libnal_recv:         kranal_recv,
2035         libnal_recv_pages:   kranal_recv_pages,
2036         libnal_dist:         kranal_dist
2037 };