Whamcloud - gitweb
38f1b77dd47f8e21178484f96e05254d4f79ca48
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46
47         CDEBUG(D_NET, "callback for device %d\n", devid);
48
49         for (i = 0; i < kranal_data.kra_ndevs; i++) {
50
51                 dev = &kranal_data.kra_devices[i];
52                 if (dev->rad_id != devid)
53                         continue;
54
55                 spin_lock_irqsave(&dev->rad_lock, flags);
56
57                 if (!dev->rad_ready) {
58                         dev->rad_ready = 1;
59                         wake_up(&dev->rad_waitq);
60                 }
61
62                 spin_unlock_irqrestore(&dev->rad_lock, flags);
63                 return;
64         }
65
66         CWARN("callback for unknown device %d\n", devid);
67 }
68
69 void
70 kranal_schedule_conn(kra_conn_t *conn)
71 {
72         kra_device_t    *dev = conn->rac_device;
73         unsigned long    flags;
74
75         spin_lock_irqsave(&dev->rad_lock, flags);
76
77         if (!conn->rac_scheduled) {
78                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
79                 conn->rac_scheduled = 1;
80                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
81                 wake_up(&dev->rad_waitq);
82         }
83
84         spin_unlock_irqrestore(&dev->rad_lock, flags);
85 }
86
87 kra_tx_t *
88 kranal_get_idle_tx (int may_block)
89 {
90         unsigned long  flags;
91         kra_tx_t      *tx = NULL;
92
93         for (;;) {
94                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
95
96                 /* "normal" descriptor is free */
97                 if (!list_empty(&kranal_data.kra_idle_txs)) {
98                         tx = list_entry(kranal_data.kra_idle_txs.next,
99                                         kra_tx_t, tx_list);
100                         break;
101                 }
102
103                 if (!may_block) {
104                         /* may dip into reserve pool */
105                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
106                                 CERROR("reserved tx desc pool exhausted\n");
107                                 break;
108                         }
109
110                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
111                                         kra_tx_t, tx_list);
112                         break;
113                 }
114
115                 /* block for idle tx */
116                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
117
118                 wait_event(kranal_data.kra_idle_tx_waitq,
119                            !list_empty(&kranal_data.kra_idle_txs));
120         }
121
122         if (tx != NULL) {
123                 list_del(&tx->tx_list);
124
125                 /* Allocate a new completion cookie.  It might not be
126                  * needed, but we've got a lock right now... */
127                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
128
129                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
130                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
131                 LASSERT (tx->tx_conn == NULL);
132                 LASSERT (tx->tx_libmsg[0] == NULL);
133                 LASSERT (tx->tx_libmsg[1] == NULL);
134         }
135
136         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
137
138         return tx;
139 }
140
141 void
142 kranal_init_msg(kra_msg_t *msg, int type)
143 {
144         msg->ram_magic = RANAL_MSG_MAGIC;
145         msg->ram_version = RANAL_MSG_VERSION;
146         msg->ram_type = type;
147         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
148         /* ram_connstamp gets set when FMA is sent */
149 }
150
151 kra_tx_t *
152 kranal_new_tx_msg (int may_block, int type)
153 {
154         kra_tx_t *tx = kranal_get_idle_tx(may_block);
155
156         if (tx == NULL)
157                 return NULL;
158
159         kranal_init_msg(&tx->tx_msg, type);
160         return tx;
161 }
162
163 int
164 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
165                                int offset, int nob)
166
167 {
168         /* For now this is almost identical to kranal_setup_virt_buffer, but we
169          * could "flatten" the payload into a single contiguous buffer ready
170          * for sending direct over an FMA if we ever needed to. */
171
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173         LASSERT (nob >= 0);
174
175         if (nob == 0) {
176                 tx->tx_buffer = NULL;
177         } else {
178                 LASSERT (niov > 0);
179
180                 while (offset >= iov->iov_len) {
181                         offset -= iov->iov_len;
182                         niov--;
183                         iov++;
184                         LASSERT (niov > 0);
185                 }
186
187                 if (nob > iov->iov_len - offset) {
188                         CERROR("Can't handle multiple vaddr fragments\n");
189                         return -EMSGSIZE;
190                 }
191
192                 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
193         }
194
195         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
196         tx->tx_nob = nob;
197         return 0;
198 }
199
200 int
201 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
202                           int offset, int nob)
203
204 {
205         LASSERT (nob > 0);
206         LASSERT (niov > 0);
207         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
208
209         while (offset >= iov->iov_len) {
210                 offset -= iov->iov_len;
211                 niov--;
212                 iov++;
213                 LASSERT (niov > 0);
214         }
215
216         if (nob > iov->iov_len - offset) {
217                 CERROR("Can't handle multiple vaddr fragments\n");
218                 return -EMSGSIZE;
219         }
220
221         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
222         tx->tx_nob = nob;
223         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
224         return 0;
225 }
226
227 int
228 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
229                           int offset, int nob)
230 {
231         RAP_PHYS_REGION *phys = tx->tx_phys;
232         int              resid;
233
234         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
235
236         LASSERT (nob > 0);
237         LASSERT (nkiov > 0);
238         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
239
240         while (offset >= kiov->kiov_len) {
241                 offset -= kiov->kiov_len;
242                 nkiov--;
243                 kiov++;
244                 LASSERT (nkiov > 0);
245         }
246
247         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
248         tx->tx_nob = nob;
249         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
250
251         phys->Address = kranal_page2phys(kiov->kiov_page);
252         phys++;
253
254         resid = nob - (kiov->kiov_len - offset);
255         while (resid > 0) {
256                 kiov++;
257                 nkiov--;
258                 LASSERT (nkiov > 0);
259
260                 if (kiov->kiov_offset != 0 ||
261                     ((resid > PAGE_SIZE) &&
262                      kiov->kiov_len < PAGE_SIZE)) {
263                         /* Can't have gaps */
264                         CERROR("Can't make payload contiguous in I/O VM:"
265                                "page %d, offset %d, len %d \n",
266                                (int)(phys - tx->tx_phys),
267                                kiov->kiov_offset, kiov->kiov_len);
268                         return -EINVAL;
269                 }
270
271                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
272                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
273                         return -EMSGSIZE;
274                 }
275
276                 phys->Address = kranal_page2phys(kiov->kiov_page);
277                 phys++;
278
279                 resid -= PAGE_SIZE;
280         }
281
282         tx->tx_phys_npages = phys - tx->tx_phys;
283         return 0;
284 }
285
286 static inline int
287 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
288                           struct iovec *iov, ptl_kiov_t *kiov,
289                           int offset, int nob)
290 {
291         LASSERT ((iov == NULL) != (kiov == NULL));
292
293         if (kiov != NULL)
294                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
295
296         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
297 }
298
299 void
300 kranal_map_buffer (kra_tx_t *tx)
301 {
302         kra_conn_t     *conn = tx->tx_conn;
303         kra_device_t   *dev = conn->rac_device;
304         RAP_RETURN      rrc;
305
306         LASSERT (current == dev->rad_scheduler);
307
308         switch (tx->tx_buftype) {
309         default:
310                 LBUG();
311
312         case RANAL_BUF_NONE:
313         case RANAL_BUF_IMMEDIATE:
314         case RANAL_BUF_PHYS_MAPPED:
315         case RANAL_BUF_VIRT_MAPPED:
316                 break;
317
318         case RANAL_BUF_PHYS_UNMAPPED:
319                 rrc = RapkRegisterPhys(dev->rad_handle,
320                                        tx->tx_phys, tx->tx_phys_npages,
321                                        &tx->tx_map_key);
322                 LASSERT (rrc == RAP_SUCCESS);
323                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
324                 break;
325
326         case RANAL_BUF_VIRT_UNMAPPED:
327                 rrc = RapkRegisterMemory(dev->rad_handle,
328                                          tx->tx_buffer, tx->tx_nob,
329                                          &tx->tx_map_key);
330                 LASSERT (rrc == RAP_SUCCESS);
331                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
332                 break;
333         }
334 }
335
336 void
337 kranal_unmap_buffer (kra_tx_t *tx)
338 {
339         kra_device_t   *dev;
340         RAP_RETURN      rrc;
341
342         switch (tx->tx_buftype) {
343         default:
344                 LBUG();
345
346         case RANAL_BUF_NONE:
347         case RANAL_BUF_IMMEDIATE:
348         case RANAL_BUF_PHYS_UNMAPPED:
349         case RANAL_BUF_VIRT_UNMAPPED:
350                 break;
351
352         case RANAL_BUF_PHYS_MAPPED:
353                 LASSERT (tx->tx_conn != NULL);
354                 dev = tx->tx_conn->rac_device;
355                 LASSERT (current == dev->rad_scheduler);
356                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
357                                            &tx->tx_map_key);
358                 LASSERT (rrc == RAP_SUCCESS);
359                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
360                 break;
361
362         case RANAL_BUF_VIRT_MAPPED:
363                 LASSERT (tx->tx_conn != NULL);
364                 dev = tx->tx_conn->rac_device;
365                 LASSERT (current == dev->rad_scheduler);
366                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
367                                            &tx->tx_map_key);
368                 LASSERT (rrc == RAP_SUCCESS);
369                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
370                 break;
371         }
372 }
373
374 void
375 kranal_tx_done (kra_tx_t *tx, int completion)
376 {
377         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
378         unsigned long    flags;
379         int              i;
380
381         LASSERT (!in_interrupt());
382
383         kranal_unmap_buffer(tx);
384
385         for (i = 0; i < 2; i++) {
386                 /* tx may have up to 2 libmsgs to finalise */
387                 if (tx->tx_libmsg[i] == NULL)
388                         continue;
389
390                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
391                 tx->tx_libmsg[i] = NULL;
392         }
393
394         tx->tx_buftype = RANAL_BUF_NONE;
395         tx->tx_msg.ram_type = RANAL_MSG_NONE;
396         tx->tx_conn = NULL;
397
398         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
399
400         if (tx->tx_isnblk) {
401                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
402         } else {
403                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
404                 wake_up(&kranal_data.kra_idle_tx_waitq);
405         }
406
407         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
408 }
409
410 kra_conn_t *
411 kranal_find_conn_locked (kra_peer_t *peer)
412 {
413         struct list_head *tmp;
414
415         /* just return the first connection */
416         list_for_each (tmp, &peer->rap_conns) {
417                 return list_entry(tmp, kra_conn_t, rac_list);
418         }
419
420         return NULL;
421 }
422
423 void
424 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
425 {
426         unsigned long    flags;
427
428         tx->tx_conn = conn;
429
430         spin_lock_irqsave(&conn->rac_lock, flags);
431         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
432         tx->tx_qtime = jiffies;
433         spin_unlock_irqrestore(&conn->rac_lock, flags);
434
435         kranal_schedule_conn(conn);
436 }
437
438 void
439 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
440 {
441         unsigned long    flags;
442         kra_peer_t      *peer;
443         kra_conn_t      *conn;
444         unsigned long    now;
445         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
446
447         /* If I get here, I've committed to send, so I complete the tx with
448          * failure on any problems */
449
450         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
451
452         read_lock(g_lock);
453
454         peer = kranal_find_peer_locked(nid);
455         if (peer == NULL) {
456                 read_unlock(g_lock);
457                 kranal_tx_done(tx, -EHOSTUNREACH);
458                 return;
459         }
460
461         conn = kranal_find_conn_locked(peer);
462         if (conn != NULL) {
463                 kranal_post_fma(conn, tx);
464                 read_unlock(g_lock);
465                 return;
466         }
467
468         /* Making one or more connections; I'll need a write lock... */
469         read_unlock(g_lock);
470         write_lock_irqsave(g_lock, flags);
471
472         peer = kranal_find_peer_locked(nid);
473         if (peer == NULL) {
474                 write_unlock_irqrestore(g_lock, flags);
475                 kranal_tx_done(tx, -EHOSTUNREACH);
476                 return;
477         }
478
479         conn = kranal_find_conn_locked(peer);
480         if (conn != NULL) {
481                 /* Connection exists; queue message on it */
482                 kranal_post_fma(conn, tx);
483                 write_unlock_irqrestore(g_lock, flags);
484                 return;
485         }
486
487         LASSERT (peer->rap_persistence > 0);
488
489         if (!peer->rap_connecting) {
490                 LASSERT (list_empty(&peer->rap_tx_queue));
491
492                 now = CURRENT_SECONDS;
493                 if (now < peer->rap_reconnect_time) {
494                         write_unlock_irqrestore(g_lock, flags);
495                         kranal_tx_done(tx, -EHOSTUNREACH);
496                         return;
497                 }
498
499                 peer->rap_connecting = 1;
500                 kranal_peer_addref(peer); /* extra ref for connd */
501
502                 spin_lock(&kranal_data.kra_connd_lock);
503
504                 list_add_tail(&peer->rap_connd_list,
505                               &kranal_data.kra_connd_peers);
506                 wake_up(&kranal_data.kra_connd_waitq);
507
508                 spin_unlock(&kranal_data.kra_connd_lock);
509         }
510
511         /* A connection is being established; queue the message... */
512         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
513
514         write_unlock_irqrestore(g_lock, flags);
515 }
516
517 void
518 kranal_rdma(kra_tx_t *tx, int type,
519             kra_rdma_desc_t *sink, int nob, __u64 cookie)
520 {
521         kra_conn_t   *conn = tx->tx_conn;
522         RAP_RETURN    rrc;
523         unsigned long flags;
524
525         LASSERT (kranal_tx_mapped(tx));
526         LASSERT (nob <= sink->rard_nob);
527         LASSERT (nob <= tx->tx_nob);
528
529         /* No actual race with scheduler sending CLOSE (I'm she!) */
530         LASSERT (current == conn->rac_device->rad_scheduler);
531
532         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
533         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
534         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
535         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
536         tx->tx_rdma_desc.DstKey = sink->rard_key;
537         tx->tx_rdma_desc.Length = nob;
538         tx->tx_rdma_desc.AppPtr = tx;
539
540         /* prep final completion message */
541         kranal_init_msg(&tx->tx_msg, type);
542         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
543
544         if (nob == 0) { /* Immediate completion */
545                 kranal_post_fma(conn, tx);
546                 return;
547         }
548
549         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
550
551         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
552         LASSERT (rrc == RAP_SUCCESS);
553
554         spin_lock_irqsave(&conn->rac_lock, flags);
555         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
556         tx->tx_qtime = jiffies;
557         spin_unlock_irqrestore(&conn->rac_lock, flags);
558 }
559
560 int
561 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
562 {
563         __u32      nob_received = nob;
564         RAP_RETURN rrc;
565
566         LASSERT (conn->rac_rxmsg != NULL);
567         CDEBUG(D_NET, "Consuming %p\n", conn);
568
569         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
570                              &nob_received, sizeof(kra_msg_t));
571         LASSERT (rrc == RAP_SUCCESS);
572
573         conn->rac_rxmsg = NULL;
574
575         if (nob_received < nob) {
576                 CWARN("Incomplete immediate msg from "LPX64
577                       ": expected %d, got %d\n",
578                       conn->rac_peer->rap_nid, nob, nob_received);
579                 return -EPROTO;
580         }
581
582         return 0;
583 }
584
585 ptl_err_t
586 kranal_do_send (lib_nal_t    *nal,
587                 void         *private,
588                 lib_msg_t    *libmsg,
589                 ptl_hdr_t    *hdr,
590                 int           type,
591                 ptl_nid_t     nid,
592                 ptl_pid_t     pid,
593                 unsigned int  niov,
594                 struct iovec *iov,
595                 ptl_kiov_t   *kiov,
596                 int           offset,
597                 int           nob)
598 {
599         kra_conn_t *conn;
600         kra_tx_t   *tx;
601         int         rc;
602
603         /* NB 'private' is different depending on what we're sending.... */
604
605         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
606                nob, niov, nid, pid);
607
608         LASSERT (nob == 0 || niov > 0);
609         LASSERT (niov <= PTL_MD_MAX_IOV);
610
611         LASSERT (!in_interrupt());
612         /* payload is either all vaddrs or all pages */
613         LASSERT (!(kiov != NULL && iov != NULL));
614
615         switch(type) {
616         default:
617                 LBUG();
618
619         case PTL_MSG_REPLY: {
620                 /* reply's 'private' is the conn that received the GET_REQ */
621                 conn = private;
622                 LASSERT (conn->rac_rxmsg != NULL);
623
624                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
625                         if (nob > RANAL_FMA_MAX_DATA) {
626                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
627                                        nob, nid);
628                                 return PTL_FAIL;
629                         }
630                         break;                  /* RDMA not expected */
631                 }
632
633                 /* Incoming message consistent with immediate reply? */
634                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
635                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
636                                nid, conn->rac_rxmsg->ram_type);
637                         return PTL_FAIL;
638                 }
639
640                 tx = kranal_get_idle_tx(0);
641                 if (tx == NULL)
642                         return PTL_FAIL;
643
644                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
645                 if (rc != 0) {
646                         kranal_tx_done(tx, rc);
647                         return PTL_FAIL;
648                 }
649
650                 tx->tx_conn = conn;
651                 tx->tx_libmsg[0] = libmsg;
652
653                 kranal_map_buffer(tx);
654                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
655                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
656                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
657
658                 /* flag matched by consuming rx message */
659                 kranal_consume_rxmsg(conn, NULL, 0);
660                 return PTL_OK;
661         }
662
663         case PTL_MSG_GET:
664                 LASSERT (niov == 0);
665                 LASSERT (nob == 0);
666                 /* We have to consider the eventual sink buffer rather than any
667                  * payload passed here (there isn't any, and strictly, looking
668                  * inside libmsg is a layering violation).  We send a simple
669                  * IMMEDIATE GET if the sink buffer is mapped already and small
670                  * enough for FMA */
671
672                 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
673                     libmsg->md->length <= RANAL_FMA_MAX_DATA &&
674                     libmsg->md->length <= kranal_tunables.kra_max_immediate)
675                         break;
676
677                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
678                 if (tx == NULL)
679                         return PTL_NO_SPACE;
680
681                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
682                         rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
683                                                       libmsg->md->md_iov.iov,
684                                                       0, libmsg->md->length);
685                 else
686                         rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
687                                                       libmsg->md->md_iov.kiov,
688                                                       0, libmsg->md->length);
689                 if (rc != 0) {
690                         kranal_tx_done(tx, rc);
691                         return PTL_FAIL;
692                 }
693
694                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
695                 if (tx->tx_libmsg[1] == NULL) {
696                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
697                         kranal_tx_done(tx, rc);
698                         return PTL_FAIL;
699                 }
700
701                 tx->tx_libmsg[0] = libmsg;
702                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
703                 /* rest of tx_msg is setup just before it is sent */
704                 kranal_launch_tx(tx, nid);
705                 return PTL_OK;
706
707         case PTL_MSG_ACK:
708                 LASSERT (nob == 0);
709                 break;
710
711         case PTL_MSG_PUT:
712                 if (kiov == NULL &&             /* not paged */
713                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
714                     nob <= kranal_tunables.kra_max_immediate)
715                         break;                  /* send IMMEDIATE */
716
717                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
718                 if (tx == NULL)
719                         return PTL_NO_SPACE;
720
721                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
722                 if (rc != 0) {
723                         kranal_tx_done(tx, rc);
724                         return PTL_FAIL;
725                 }
726
727                 tx->tx_libmsg[0] = libmsg;
728                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
729                 /* rest of tx_msg is setup just before it is sent */
730                 kranal_launch_tx(tx, nid);
731                 return PTL_OK;
732         }
733
734         LASSERT (kiov == NULL);
735         LASSERT (nob <= RANAL_FMA_MAX_DATA);
736
737         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
738                                  type == PTL_MSG_REPLY ||
739                                  in_interrupt()),
740                                RANAL_MSG_IMMEDIATE);
741         if (tx == NULL)
742                 return PTL_NO_SPACE;
743
744         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
745         if (rc != 0) {
746                 kranal_tx_done(tx, rc);
747                 return PTL_FAIL;
748         }
749
750         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
751         tx->tx_libmsg[0] = libmsg;
752         kranal_launch_tx(tx, nid);
753         return PTL_OK;
754 }
755
756 ptl_err_t
757 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
758              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
759              unsigned int niov, struct iovec *iov,
760              size_t offset, size_t len)
761 {
762         return kranal_do_send(nal, private, cookie,
763                               hdr, type, nid, pid,
764                               niov, iov, NULL,
765                               offset, len);
766 }
767
768 ptl_err_t
769 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
770                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
771                    unsigned int niov, ptl_kiov_t *kiov,
772                    size_t offset, size_t len)
773 {
774         return kranal_do_send(nal, private, cookie,
775                               hdr, type, nid, pid,
776                               niov, NULL, kiov,
777                               offset, len);
778 }
779
780 ptl_err_t
781 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
782                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
783                 int offset, int mlen, int rlen)
784 {
785         kra_conn_t  *conn = private;
786         kra_msg_t   *rxmsg = conn->rac_rxmsg;
787         kra_tx_t    *tx;
788         void        *buffer;
789         int          rc;
790
791         LASSERT (mlen <= rlen);
792         LASSERT (!in_interrupt());
793         /* Either all pages or all vaddrs */
794         LASSERT (!(kiov != NULL && iov != NULL));
795
796         CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
797
798         if (libmsg == NULL) {
799                 /* GET or ACK or portals is discarding */
800                 LASSERT (mlen == 0);
801                 lib_finalize(nal, NULL, libmsg, PTL_OK);
802                 return PTL_OK;
803         }
804
805         switch(rxmsg->ram_type) {
806         default:
807                 LBUG();
808                 return PTL_FAIL;
809
810         case RANAL_MSG_IMMEDIATE:
811                 if (mlen == 0) {
812                         buffer = NULL;
813                 } else if (kiov != NULL) {
814                         CERROR("Can't recv immediate into paged buffer\n");
815                         return PTL_FAIL;
816                 } else {
817                         LASSERT (niov > 0);
818                         while (offset >= iov->iov_len) {
819                                 offset -= iov->iov_len;
820                                 iov++;
821                                 niov--;
822                                 LASSERT (niov > 0);
823                         }
824                         if (mlen > iov->iov_len - offset) {
825                                 CERROR("Can't handle immediate frags\n");
826                                 return PTL_FAIL;
827                         }
828                         buffer = ((char *)iov->iov_base) + offset;
829                 }
830                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
831                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
832                 return PTL_OK;
833
834         case RANAL_MSG_PUT_REQ:
835                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
836                 if (tx == NULL)
837                         return PTL_NO_SPACE;
838
839                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
840                 if (rc != 0) {
841                         kranal_tx_done(tx, rc);
842                         return PTL_FAIL;
843                 }
844
845                 tx->tx_conn = conn;
846                 kranal_map_buffer(tx);
847
848                 tx->tx_msg.ram_u.putack.rapam_src_cookie =
849                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
850                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
851                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
852                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
853                         (__u64)((unsigned long)tx->tx_buffer);
854                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
855
856                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
857
858                 kranal_post_fma(conn, tx);
859
860                 /* flag matched by consuming rx message */
861                 kranal_consume_rxmsg(conn, NULL, 0);
862                 return PTL_OK;
863         }
864 }
865
866 ptl_err_t
867 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
868              unsigned int niov, struct iovec *iov,
869              size_t offset, size_t mlen, size_t rlen)
870 {
871         return kranal_do_recv(nal, private, msg, niov, iov, NULL,
872                               offset, mlen, rlen);
873 }
874
875 ptl_err_t
876 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
877                    unsigned int niov, ptl_kiov_t *kiov,
878                    size_t offset, size_t mlen, size_t rlen)
879 {
880         return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
881                               offset, mlen, rlen);
882 }
883
884 int
885 kranal_thread_start (int(*fn)(void *arg), void *arg)
886 {
887         long    pid = kernel_thread(fn, arg, 0);
888
889         if (pid < 0)
890                 return(int)pid;
891
892         atomic_inc(&kranal_data.kra_nthreads);
893         return 0;
894 }
895
896 void
897 kranal_thread_fini (void)
898 {
899         atomic_dec(&kranal_data.kra_nthreads);
900 }
901
902 int
903 kranal_check_conn_timeouts (kra_conn_t *conn)
904 {
905         kra_tx_t          *tx;
906         struct list_head  *ttmp;
907         unsigned long      flags;
908         long               timeout;
909         unsigned long      now = jiffies;
910
911         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
912                  conn->rac_state == RANAL_CONN_CLOSING);
913
914         if (!conn->rac_close_sent &&
915             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
916                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
917                 CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
918                        conn, conn->rac_peer->rap_nid);
919                 kranal_schedule_conn(conn);
920         }
921
922         timeout = conn->rac_timeout * HZ;
923
924         if (!conn->rac_close_recvd &&
925             time_after_eq(now, conn->rac_last_rx + timeout)) {
926                 CERROR("%s received from "LPX64" within %lu seconds\n",
927                        (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
928                        "Nothing" : "CLOSE not",
929                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
930                 return -ETIMEDOUT;
931         }
932
933         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
934                 return 0;
935
936         /* Check the conn's queues are moving.  These are "belt+braces" checks,
937          * in case of hardware/software errors that make this conn seem
938          * responsive even though it isn't progressing its message queues. */
939
940         spin_lock_irqsave(&conn->rac_lock, flags);
941
942         list_for_each (ttmp, &conn->rac_fmaq) {
943                 tx = list_entry(ttmp, kra_tx_t, tx_list);
944
945                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
946                         spin_unlock_irqrestore(&conn->rac_lock, flags);
947                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
948                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
949                         return -ETIMEDOUT;
950                 }
951         }
952
953         list_for_each (ttmp, &conn->rac_rdmaq) {
954                 tx = list_entry(ttmp, kra_tx_t, tx_list);
955
956                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
957                         spin_unlock_irqrestore(&conn->rac_lock, flags);
958                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
959                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
960                         return -ETIMEDOUT;
961                 }
962         }
963
964         list_for_each (ttmp, &conn->rac_replyq) {
965                 tx = list_entry(ttmp, kra_tx_t, tx_list);
966
967                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
968                         spin_unlock_irqrestore(&conn->rac_lock, flags);
969                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
970                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
971                         return -ETIMEDOUT;
972                 }
973         }
974
975         spin_unlock_irqrestore(&conn->rac_lock, flags);
976         return 0;
977 }
978
979 void
980 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
981 {
982         struct list_head  *conns = &kranal_data.kra_conns[idx];
983         struct list_head  *ctmp;
984         kra_conn_t        *conn;
985         unsigned long      flags;
986         int                rc;
987
988  again:
989         /* NB. We expect to check all the conns and not find any problems, so
990          * we just use a shared lock while we take a look... */
991         read_lock(&kranal_data.kra_global_lock);
992
993         list_for_each (ctmp, conns) {
994                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
995
996                 if (conn->rac_timeout < *min_timeoutp )
997                         *min_timeoutp = conn->rac_timeout;
998                 if (conn->rac_keepalive < *min_timeoutp )
999                         *min_timeoutp = conn->rac_keepalive;
1000
1001                 rc = kranal_check_conn_timeouts(conn);
1002                 if (rc == 0)
1003                         continue;
1004
1005                 kranal_conn_addref(conn);
1006                 read_unlock(&kranal_data.kra_global_lock);
1007
1008                 CERROR("Conn to "LPX64", cqid %d timed out\n",
1009                        conn->rac_peer->rap_nid, conn->rac_cqid);
1010
1011                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1012
1013                 switch (conn->rac_state) {
1014                 default:
1015                         LBUG();
1016
1017                 case RANAL_CONN_ESTABLISHED:
1018                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1019                         break;
1020
1021                 case RANAL_CONN_CLOSING:
1022                         kranal_terminate_conn_locked(conn);
1023                         break;
1024                 }
1025
1026                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1027
1028                 kranal_conn_decref(conn);
1029
1030                 /* start again now I've dropped the lock */
1031                 goto again;
1032         }
1033
1034         read_unlock(&kranal_data.kra_global_lock);
1035 }
1036
1037 int
1038 kranal_connd (void *arg)
1039 {
1040         long               id = (long)arg;
1041         char               name[16];
1042         wait_queue_t       wait;
1043         unsigned long      flags;
1044         kra_peer_t        *peer;
1045         kra_acceptsock_t  *ras;
1046         int                did_something;
1047
1048         snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1049         kportal_daemonize(name);
1050         kportal_blockallsigs();
1051
1052         init_waitqueue_entry(&wait, current);
1053
1054         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1055
1056         while (!kranal_data.kra_shutdown) {
1057                 did_something = 0;
1058
1059                 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1060                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
1061                                          kra_acceptsock_t, ras_list);
1062                         list_del(&ras->ras_list);
1063
1064                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1065
1066                         CDEBUG(D_NET,"About to handshake someone\n");
1067
1068                         kranal_conn_handshake(ras->ras_sock, NULL);
1069                         kranal_free_acceptsock(ras);
1070
1071                         CDEBUG(D_NET,"Finished handshaking someone\n");
1072
1073                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1074                         did_something = 1;
1075                 }
1076
1077                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1078                         peer = list_entry(kranal_data.kra_connd_peers.next,
1079                                           kra_peer_t, rap_connd_list);
1080
1081                         list_del_init(&peer->rap_connd_list);
1082                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1083
1084                         kranal_connect(peer);
1085                         kranal_peer_decref(peer);
1086
1087                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1088                         did_something = 1;
1089                 }
1090
1091                 if (did_something)
1092                         continue;
1093
1094                 set_current_state(TASK_INTERRUPTIBLE);
1095                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1096
1097                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1098
1099                 schedule ();
1100
1101                 set_current_state(TASK_RUNNING);
1102                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1103
1104                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1105         }
1106
1107         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1108
1109         kranal_thread_fini();
1110         return 0;
1111 }
1112
1113 void
1114 kranal_update_reaper_timeout(long timeout)
1115 {
1116         unsigned long   flags;
1117
1118         LASSERT (timeout > 0);
1119
1120         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1121
1122         if (timeout < kranal_data.kra_new_min_timeout)
1123                 kranal_data.kra_new_min_timeout = timeout;
1124
1125         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1126 }
1127
1128 int
1129 kranal_reaper (void *arg)
1130 {
1131         wait_queue_t       wait;
1132         unsigned long      flags;
1133         long               timeout;
1134         int                i;
1135         int                conn_entries = kranal_data.kra_conn_hash_size;
1136         int                conn_index = 0;
1137         int                base_index = conn_entries - 1;
1138         unsigned long      next_check_time = jiffies;
1139         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1140         long               current_min_timeout = 1;
1141
1142         kportal_daemonize("kranal_reaper");
1143         kportal_blockallsigs();
1144
1145         init_waitqueue_entry(&wait, current);
1146
1147         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1148
1149         while (!kranal_data.kra_shutdown) {
1150                 /* I wake up every 'p' seconds to check for timeouts on some
1151                  * more peers.  I try to check every connection 'n' times
1152                  * within the global minimum of all keepalive and timeout
1153                  * intervals, to ensure I attend to every connection within
1154                  * (n+1)/n times its timeout intervals. */
1155                 const int     p = 1;
1156                 const int     n = 3;
1157                 unsigned long min_timeout;
1158                 int           chunk;
1159
1160                 /* careful with the jiffy wrap... */
1161                 timeout = (long)(next_check_time - jiffies);
1162                 if (timeout > 0) {
1163                         set_current_state(TASK_INTERRUPTIBLE);
1164                         add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1165
1166                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1167
1168                         schedule_timeout(timeout);
1169
1170                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1171
1172                         set_current_state(TASK_RUNNING);
1173                         remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1174                         continue;
1175                 }
1176
1177                 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1178                         /* new min timeout set: restart min timeout scan */
1179                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1180                         base_index = conn_index - 1;
1181                         if (base_index < 0)
1182                                 base_index = conn_entries - 1;
1183
1184                         if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1185                                 current_min_timeout = kranal_data.kra_new_min_timeout;
1186                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1187                                        current_min_timeout);
1188                         }
1189
1190                         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1191                 }
1192                 min_timeout = current_min_timeout;
1193
1194                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1195
1196                 LASSERT (min_timeout > 0);
1197
1198                 /* Compute how many table entries to check now so I get round
1199                  * the whole table fast enough given that I do this at fixed
1200                  * intervals of 'p' seconds) */
1201                 chunk = conn_entries;
1202                 if (min_timeout > n * p)
1203                         chunk = (chunk * n * p) / min_timeout;
1204                 if (chunk == 0)
1205                         chunk = 1;
1206
1207                 for (i = 0; i < chunk; i++) {
1208                         kranal_reaper_check(conn_index,
1209                                             &next_min_timeout);
1210                         conn_index = (conn_index + 1) % conn_entries;
1211                 }
1212
1213                 next_check_time += p * HZ;
1214
1215                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1216
1217                 if (((conn_index - chunk <= base_index &&
1218                       base_index < conn_index) ||
1219                      (conn_index - conn_entries - chunk <= base_index &&
1220                       base_index < conn_index - conn_entries))) {
1221
1222                         /* Scanned all conns: set current_min_timeout... */
1223                         if (current_min_timeout != next_min_timeout) {
1224                                 current_min_timeout = next_min_timeout;
1225                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1226                                        current_min_timeout);
1227                         }
1228
1229                         /* ...and restart min timeout scan */
1230                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1231                         base_index = conn_index - 1;
1232                         if (base_index < 0)
1233                                 base_index = conn_entries - 1;
1234                 }
1235         }
1236
1237         kranal_thread_fini();
1238         return 0;
1239 }
1240
1241 void
1242 kranal_check_rdma_cq (kra_device_t *dev)
1243 {
1244         kra_conn_t          *conn;
1245         kra_tx_t            *tx;
1246         RAP_RETURN           rrc;
1247         unsigned long        flags;
1248         RAP_RDMA_DESCRIPTOR *desc;
1249         __u32                cqid;
1250         __u32                event_type;
1251
1252         for (;;) {
1253                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1254                 if (rrc == RAP_NOT_DONE) {
1255                         CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1256                         return;
1257                 }
1258
1259                 LASSERT (rrc == RAP_SUCCESS);
1260                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1261
1262                 read_lock(&kranal_data.kra_global_lock);
1263
1264                 conn = kranal_cqid2conn_locked(cqid);
1265                 if (conn == NULL) {
1266                         /* Conn was destroyed? */
1267                         CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1268                         read_unlock(&kranal_data.kra_global_lock);
1269                         continue;
1270                 }
1271
1272                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1273                 LASSERT (rrc == RAP_SUCCESS);
1274
1275                 CDEBUG(D_NET, "Completed %p\n",
1276                        list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1277
1278                 spin_lock_irqsave(&conn->rac_lock, flags);
1279
1280                 LASSERT (!list_empty(&conn->rac_rdmaq));
1281                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1282                 list_del(&tx->tx_list);
1283
1284                 LASSERT(desc->AppPtr == (void *)tx);
1285                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1286                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1287
1288                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1289                 tx->tx_qtime = jiffies;
1290
1291                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1292
1293                 /* Get conn's fmaq processed, now I've just put something
1294                  * there */
1295                 kranal_schedule_conn(conn);
1296
1297                 read_unlock(&kranal_data.kra_global_lock);
1298         }
1299 }
1300
1301 void
1302 kranal_check_fma_cq (kra_device_t *dev)
1303 {
1304         kra_conn_t         *conn;
1305         RAP_RETURN          rrc;
1306         __u32               cqid;
1307         __u32               event_type;
1308         struct list_head   *conns;
1309         struct list_head   *tmp;
1310         int                 i;
1311
1312         for (;;) {
1313                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1314                 if (rrc == RAP_NOT_DONE) {
1315                         CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1316                         return;
1317                 }
1318
1319                 LASSERT (rrc == RAP_SUCCESS);
1320
1321                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1322
1323                         read_lock(&kranal_data.kra_global_lock);
1324
1325                         conn = kranal_cqid2conn_locked(cqid);
1326                         if (conn == NULL) {
1327                                 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1328                                        cqid);
1329                         } else {
1330                                 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1331                                        conn, cqid);
1332                                 kranal_schedule_conn(conn);
1333                         }
1334
1335                         read_unlock(&kranal_data.kra_global_lock);
1336                         continue;
1337                 }
1338
1339                 /* FMA CQ has overflowed: check ALL conns */
1340                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1341
1342                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1343
1344                         read_lock(&kranal_data.kra_global_lock);
1345
1346                         conns = &kranal_data.kra_conns[i];
1347
1348                         list_for_each (tmp, conns) {
1349                                 conn = list_entry(tmp, kra_conn_t,
1350                                                   rac_hashlist);
1351
1352                                 if (conn->rac_device == dev)
1353                                         kranal_schedule_conn(conn);
1354                         }
1355
1356                         /* don't block write lockers for too long... */
1357                         read_unlock(&kranal_data.kra_global_lock);
1358                 }
1359         }
1360 }
1361
1362 int
1363 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1364                void *immediate, int immediatenob)
1365 {
1366         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1367         RAP_RETURN rrc;
1368
1369         CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1370                conn, msg, msg->ram_type, sync ? "(sync)" : "",
1371                immediate, immediatenob);
1372
1373         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1374         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1375                  immediatenob <= RANAL_FMA_MAX_DATA :
1376                  immediatenob == 0);
1377
1378         msg->ram_connstamp = conn->rac_my_connstamp;
1379         msg->ram_seq = conn->rac_tx_seq;
1380
1381         if (sync)
1382                 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1383                                       immediate, immediatenob,
1384                                       msg, sizeof(*msg));
1385         else
1386                 rrc = RapkFmaSend(conn->rac_rihandle,
1387                                   immediate, immediatenob,
1388                                   msg, sizeof(*msg));
1389
1390         switch (rrc) {
1391         default:
1392                 LBUG();
1393
1394         case RAP_SUCCESS:
1395                 conn->rac_last_tx = jiffies;
1396                 conn->rac_tx_seq++;
1397                 return 0;
1398
1399         case RAP_NOT_DONE:
1400                 return -EAGAIN;
1401         }
1402 }
1403
1404 void
1405 kranal_process_fmaq (kra_conn_t *conn)
1406 {
1407         unsigned long flags;
1408         int           more_to_do;
1409         kra_tx_t     *tx;
1410         int           rc;
1411         int           expect_reply;
1412
1413         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1414          *       However I will be rescheduled some by an FMA completion event
1415          *       when I eventually get some.
1416          * NB 2. Sampling rac_state here races with setting it elsewhere.
1417          *       But it doesn't matter if I try to send a "real" message just
1418          *       as I start closing because I'll get scheduled to send the
1419          *       close anyway. */
1420
1421         /* Not racing with incoming message processing! */
1422         LASSERT (current == conn->rac_device->rad_scheduler);
1423
1424         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1425                 if (!list_empty(&conn->rac_rdmaq)) {
1426                         /* RDMAs in progress */
1427                         LASSERT (!conn->rac_close_sent);
1428
1429                         if (time_after_eq(jiffies,
1430                                           conn->rac_last_tx +
1431                                           conn->rac_keepalive * HZ)) {
1432                                 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1433                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1434                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1435                         }
1436                         return;
1437                 }
1438
1439                 if (conn->rac_close_sent)
1440                         return;
1441
1442                 CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
1443                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1444                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1445                 if (rc != 0)
1446                         return;
1447
1448                 conn->rac_close_sent = 1;
1449                 if (!conn->rac_close_recvd)
1450                         return;
1451
1452                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1453
1454                 if (conn->rac_state == RANAL_CONN_CLOSING)
1455                         kranal_terminate_conn_locked(conn);
1456
1457                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1458                 return;
1459         }
1460
1461         spin_lock_irqsave(&conn->rac_lock, flags);
1462
1463         if (list_empty(&conn->rac_fmaq)) {
1464
1465                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1466
1467                 if (time_after_eq(jiffies,
1468                                   conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1469                         CDEBUG(D_NET, "sending NOOP (idle)\n");
1470                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1471                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1472                 }
1473                 return;
1474         }
1475
1476         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1477         list_del(&tx->tx_list);
1478         more_to_do = !list_empty(&conn->rac_fmaq);
1479
1480         spin_unlock_irqrestore(&conn->rac_lock, flags);
1481
1482         expect_reply = 0;
1483         CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1484                tx, tx->tx_msg.ram_type, tx->tx_cookie);
1485         switch (tx->tx_msg.ram_type) {
1486         default:
1487                 LBUG();
1488
1489         case RANAL_MSG_IMMEDIATE:
1490                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1491                                     tx->tx_buffer, tx->tx_nob);
1492                 expect_reply = 0;
1493                 break;
1494
1495         case RANAL_MSG_PUT_NAK:
1496         case RANAL_MSG_PUT_DONE:
1497         case RANAL_MSG_GET_NAK:
1498         case RANAL_MSG_GET_DONE:
1499                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1500                 expect_reply = 0;
1501                 break;
1502
1503         case RANAL_MSG_PUT_REQ:
1504                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1505                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1506                 kranal_map_buffer(tx);
1507                 expect_reply = 1;
1508                 break;
1509
1510         case RANAL_MSG_PUT_ACK:
1511                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1512                 expect_reply = 1;
1513                 break;
1514
1515         case RANAL_MSG_GET_REQ:
1516                 kranal_map_buffer(tx);
1517                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1518                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1519                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1520                         (__u64)((unsigned long)tx->tx_buffer);
1521                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1522                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1523                 expect_reply = 1;
1524                 break;
1525         }
1526
1527         if (rc == -EAGAIN) {
1528                 /* I need credits to send this.  Replace tx at the head of the
1529                  * fmaq and I'll get rescheduled when credits appear */
1530                 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1531                 spin_lock_irqsave(&conn->rac_lock, flags);
1532                 list_add(&tx->tx_list, &conn->rac_fmaq);
1533                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1534                 return;
1535         }
1536
1537         LASSERT (rc == 0);
1538
1539         if (!expect_reply) {
1540                 kranal_tx_done(tx, 0);
1541         } else {
1542                 /* LASSERT(current) above ensures this doesn't race with reply
1543                  * processing */
1544                 spin_lock_irqsave(&conn->rac_lock, flags);
1545                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1546                 tx->tx_qtime = jiffies;
1547                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1548         }
1549
1550         if (more_to_do) {
1551                 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1552                 kranal_schedule_conn(conn);
1553         }
1554 }
1555
1556 static inline void
1557 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1558 {
1559         __swab64s(&d->rard_key.Key);
1560         __swab16s(&d->rard_key.Cookie);
1561         __swab16s(&d->rard_key.MdHandle);
1562         __swab32s(&d->rard_key.Flags);
1563         __swab64s(&d->rard_addr.AddressBits);
1564         __swab32s(&d->rard_nob);
1565 }
1566
1567 kra_tx_t *
1568 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1569 {
1570         struct list_head *ttmp;
1571         kra_tx_t         *tx;
1572         unsigned long     flags;
1573
1574         spin_lock_irqsave(&conn->rac_lock, flags);
1575
1576         list_for_each(ttmp, &conn->rac_replyq) {
1577                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1578
1579                 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1580                        tx, tx->tx_msg.ram_type, tx->tx_cookie);
1581
1582                 if (tx->tx_cookie != cookie)
1583                         continue;
1584
1585                 if (tx->tx_msg.ram_type != type) {
1586                         spin_unlock_irqrestore(&conn->rac_lock, flags);
1587                         CWARN("Unexpected type %x (%x expected) "
1588                               "matched reply from "LPX64"\n",
1589                               tx->tx_msg.ram_type, type,
1590                               conn->rac_peer->rap_nid);
1591                         return NULL;
1592                 }
1593
1594                 list_del(&tx->tx_list);
1595                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1596                 return tx;
1597         }
1598
1599         spin_unlock_irqrestore(&conn->rac_lock, flags);
1600         CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
1601               type, cookie, conn->rac_peer->rap_nid);
1602         return NULL;
1603 }
1604
1605 void
1606 kranal_check_fma_rx (kra_conn_t *conn)
1607 {
1608         unsigned long flags;
1609         __u32         seq;
1610         kra_tx_t     *tx;
1611         kra_msg_t    *msg;
1612         void         *prefix;
1613         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1614         kra_peer_t   *peer = conn->rac_peer;
1615
1616         if (rrc == RAP_NOT_DONE)
1617                 return;
1618
1619         CDEBUG(D_NET, "RX on %p\n", conn);
1620
1621         LASSERT (rrc == RAP_SUCCESS);
1622         conn->rac_last_rx = jiffies;
1623         seq = conn->rac_rx_seq++;
1624         msg = (kra_msg_t *)prefix;
1625
1626         /* stash message for portals callbacks they'll NULL
1627          * rac_rxmsg if they consume it */
1628         LASSERT (conn->rac_rxmsg == NULL);
1629         conn->rac_rxmsg = msg;
1630
1631         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1632                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1633                         CERROR("Unexpected magic %08x from "LPX64"\n",
1634                                msg->ram_magic, peer->rap_nid);
1635                         goto out;
1636                 }
1637
1638                 __swab32s(&msg->ram_magic);
1639                 __swab16s(&msg->ram_version);
1640                 __swab16s(&msg->ram_type);
1641                 __swab64s(&msg->ram_srcnid);
1642                 __swab64s(&msg->ram_connstamp);
1643                 __swab32s(&msg->ram_seq);
1644
1645                 /* NB message type checked below; NOT here... */
1646                 switch (msg->ram_type) {
1647                 case RANAL_MSG_PUT_ACK:
1648                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1649                         break;
1650
1651                 case RANAL_MSG_GET_REQ:
1652                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1653                         break;
1654
1655                 default:
1656                         break;
1657                 }
1658         }
1659
1660         if (msg->ram_version != RANAL_MSG_VERSION) {
1661                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1662                        msg->ram_version, peer->rap_nid);
1663                 goto out;
1664         }
1665
1666         if (msg->ram_srcnid != peer->rap_nid) {
1667                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1668                        msg->ram_srcnid, peer->rap_nid);
1669                 goto out;
1670         }
1671
1672         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1673                 CERROR("Unexpected connstamp "LPX64"("LPX64
1674                        " expected) from "LPX64"\n",
1675                        msg->ram_connstamp, conn->rac_peer_connstamp,
1676                        peer->rap_nid);
1677                 goto out;
1678         }
1679
1680         if (msg->ram_seq != seq) {
1681                 CERROR("Unexpected sequence number %d(%d expected) from "
1682                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1683                 goto out;
1684         }
1685
1686         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1687                 /* This message signals RDMA completion... */
1688                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1689                 LASSERT (rrc == RAP_SUCCESS);
1690         }
1691
1692         if (conn->rac_close_recvd) {
1693                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1694                        msg->ram_type, conn->rac_peer->rap_nid);
1695                 goto out;
1696         }
1697
1698         if (msg->ram_type == RANAL_MSG_CLOSE) {
1699                 CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
1700                 conn->rac_close_recvd = 1;
1701                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1702
1703                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1704                         kranal_close_conn_locked(conn, 0);
1705                 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1706                          conn->rac_close_sent)
1707                         kranal_terminate_conn_locked(conn);
1708
1709                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1710                 goto out;
1711         }
1712
1713         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1714                 goto out;
1715
1716         switch (msg->ram_type) {
1717         case RANAL_MSG_NOOP:
1718                 /* Nothing to do; just a keepalive */
1719                 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1720                 break;
1721
1722         case RANAL_MSG_IMMEDIATE:
1723                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1724                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1725                 break;
1726
1727         case RANAL_MSG_PUT_REQ:
1728                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1729                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1730
1731                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1732                         break;
1733
1734                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1735                 if (tx == NULL)
1736                         break;
1737
1738                 tx->tx_msg.ram_u.completion.racm_cookie =
1739                         msg->ram_u.putreq.raprm_cookie;
1740                 kranal_post_fma(conn, tx);
1741                 break;
1742
1743         case RANAL_MSG_PUT_NAK:
1744                 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1745                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1746                                         msg->ram_u.completion.racm_cookie);
1747                 if (tx == NULL)
1748                         break;
1749
1750                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1751                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1752                 kranal_tx_done(tx, -ENOENT);    /* no match */
1753                 break;
1754
1755         case RANAL_MSG_PUT_ACK:
1756                 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1757                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1758                                         msg->ram_u.putack.rapam_src_cookie);
1759                 if (tx == NULL)
1760                         break;
1761
1762                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1763                             &msg->ram_u.putack.rapam_desc,
1764                             msg->ram_u.putack.rapam_desc.rard_nob,
1765                             msg->ram_u.putack.rapam_dst_cookie);
1766                 break;
1767
1768         case RANAL_MSG_PUT_DONE:
1769                 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1770                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1771                                         msg->ram_u.completion.racm_cookie);
1772                 if (tx == NULL)
1773                         break;
1774
1775                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1776                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1777                 kranal_tx_done(tx, 0);
1778                 break;
1779
1780         case RANAL_MSG_GET_REQ:
1781                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1782                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1783
1784                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1785                         break;
1786
1787                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1788                 if (tx == NULL)
1789                         break;
1790
1791                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1792                 kranal_post_fma(conn, tx);
1793                 break;
1794
1795         case RANAL_MSG_GET_NAK:
1796                 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1797                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1798                                         msg->ram_u.completion.racm_cookie);
1799                 if (tx == NULL)
1800                         break;
1801
1802                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1803                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1804                 kranal_tx_done(tx, -ENOENT);    /* no match */
1805                 break;
1806
1807         case RANAL_MSG_GET_DONE:
1808                 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1809                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1810                                         msg->ram_u.completion.racm_cookie);
1811                 if (tx == NULL)
1812                         break;
1813
1814                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1815                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1816                 kranal_tx_done(tx, 0);
1817                 break;
1818         }
1819
1820  out:
1821         if (conn->rac_rxmsg != NULL)
1822                 kranal_consume_rxmsg(conn, NULL, 0);
1823
1824         /* check again later */
1825         kranal_schedule_conn(conn);
1826 }
1827
1828 void
1829 kranal_complete_closed_conn (kra_conn_t *conn)
1830 {
1831         kra_tx_t   *tx;
1832
1833         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1834         LASSERT (list_empty(&conn->rac_list));
1835         LASSERT (list_empty(&conn->rac_hashlist));
1836
1837         while (!list_empty(&conn->rac_fmaq)) {
1838                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1839
1840                 list_del(&tx->tx_list);
1841                 kranal_tx_done(tx, -ECONNABORTED);
1842         }
1843
1844         LASSERT (list_empty(&conn->rac_rdmaq));
1845
1846         while (!list_empty(&conn->rac_replyq)) {
1847                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1848
1849                 list_del(&tx->tx_list);
1850                 kranal_tx_done(tx, -ECONNABORTED);
1851         }
1852 }
1853
1854 int
1855 kranal_scheduler (void *arg)
1856 {
1857         kra_device_t   *dev = (kra_device_t *)arg;
1858         wait_queue_t    wait;
1859         char            name[16];
1860         kra_conn_t     *conn;
1861         unsigned long   flags;
1862         int             busy_loops = 0;
1863
1864         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1865         kportal_daemonize(name);
1866         kportal_blockallsigs();
1867
1868         dev->rad_scheduler = current;
1869         init_waitqueue_entry(&wait, current);
1870
1871         spin_lock_irqsave(&dev->rad_lock, flags);
1872
1873         while (!kranal_data.kra_shutdown) {
1874                 /* Safe: kra_shutdown only set when quiescent */
1875
1876                 if (busy_loops++ >= RANAL_RESCHED) {
1877                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1878
1879                         our_cond_resched();
1880                         busy_loops = 0;
1881
1882                         spin_lock_irqsave(&dev->rad_lock, flags);
1883                 }
1884
1885                 if (dev->rad_ready) {
1886                         /* Device callback fired since I last checked it */
1887                         dev->rad_ready = 0;
1888                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1889
1890                         kranal_check_rdma_cq(dev);
1891                         kranal_check_fma_cq(dev);
1892
1893                         spin_lock_irqsave(&dev->rad_lock, flags);
1894                 }
1895
1896                 if (!list_empty(&dev->rad_connq)) {
1897                         /* Connection needs attention */
1898                         conn = list_entry(dev->rad_connq.next,
1899                                           kra_conn_t, rac_schedlist);
1900                         list_del_init(&conn->rac_schedlist);
1901                         LASSERT (conn->rac_scheduled);
1902                         conn->rac_scheduled = 0;
1903                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1904
1905                         kranal_check_fma_rx(conn);
1906                         kranal_process_fmaq(conn);
1907
1908                         if (conn->rac_state == RANAL_CONN_CLOSED)
1909                                 kranal_complete_closed_conn(conn);
1910
1911                         kranal_conn_decref(conn);
1912
1913                         spin_lock_irqsave(&dev->rad_lock, flags);
1914                         continue;
1915                 }
1916
1917                 /* recheck device callback fired before sleeping */
1918                 if (dev->rad_ready)
1919                         continue;
1920
1921                 add_wait_queue(&dev->rad_waitq, &wait);
1922                 set_current_state(TASK_INTERRUPTIBLE);
1923
1924                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1925
1926                 busy_loops = 0;
1927                 schedule();
1928
1929                 set_current_state(TASK_RUNNING);
1930                 remove_wait_queue(&dev->rad_waitq, &wait);
1931
1932                 spin_lock_irqsave(&dev->rad_lock, flags);
1933         }
1934
1935         spin_unlock_irqrestore(&dev->rad_lock, flags);
1936
1937         dev->rad_scheduler = NULL;
1938         kranal_thread_fini();
1939         return 0;
1940 }
1941
1942
1943 lib_nal_t kranal_lib = {
1944         libnal_data:        &kranal_data,      /* NAL private data */
1945         libnal_send:         kranal_send,
1946         libnal_send_pages:   kranal_send_pages,
1947         libnal_recv:         kranal_recv,
1948         libnal_recv_pages:   kranal_recv_pages,
1949         libnal_dist:         kranal_dist
1950 };