Whamcloud - gitweb
* #ifdef for extra proc_dointvec parameter under linux 2.6.8 in ranal and
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46
47         CDEBUG(D_NET, "callback for device %d\n", devid);
48
49         for (i = 0; i < kranal_data.kra_ndevs; i++) {
50
51                 dev = &kranal_data.kra_devices[i];
52                 if (dev->rad_id != devid)
53                         continue;
54
55                 spin_lock_irqsave(&dev->rad_lock, flags);
56
57                 if (!dev->rad_ready) {
58                         dev->rad_ready = 1;
59                         wake_up(&dev->rad_waitq);
60                 }
61
62                 spin_unlock_irqrestore(&dev->rad_lock, flags);
63                 return;
64         }
65
66         CWARN("callback for unknown device %d\n", devid);
67 }
68
69 void
70 kranal_schedule_conn(kra_conn_t *conn)
71 {
72         kra_device_t    *dev = conn->rac_device;
73         unsigned long    flags;
74
75         spin_lock_irqsave(&dev->rad_lock, flags);
76
77         if (!conn->rac_scheduled) {
78                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
79                 conn->rac_scheduled = 1;
80                 list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
81                 wake_up(&dev->rad_waitq);
82         }
83
84         spin_unlock_irqrestore(&dev->rad_lock, flags);
85 }
86
87 kra_tx_t *
88 kranal_get_idle_tx (int may_block)
89 {
90         unsigned long  flags;
91         kra_tx_t      *tx = NULL;
92
93         for (;;) {
94                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
95
96                 /* "normal" descriptor is free */
97                 if (!list_empty(&kranal_data.kra_idle_txs)) {
98                         tx = list_entry(kranal_data.kra_idle_txs.next,
99                                         kra_tx_t, tx_list);
100                         break;
101                 }
102
103                 if (!may_block) {
104                         /* may dip into reserve pool */
105                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
106                                 CERROR("reserved tx desc pool exhausted\n");
107                                 break;
108                         }
109
110                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
111                                         kra_tx_t, tx_list);
112                         break;
113                 }
114
115                 /* block for idle tx */
116                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
117
118                 wait_event(kranal_data.kra_idle_tx_waitq,
119                            !list_empty(&kranal_data.kra_idle_txs));
120         }
121
122         if (tx != NULL) {
123                 list_del(&tx->tx_list);
124
125                 /* Allocate a new completion cookie.  It might not be
126                  * needed, but we've got a lock right now... */
127                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
128
129                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
130                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
131                 LASSERT (tx->tx_conn == NULL);
132                 LASSERT (tx->tx_libmsg[0] == NULL);
133                 LASSERT (tx->tx_libmsg[1] == NULL);
134         }
135
136         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
137
138         return tx;
139 }
140
141 void
142 kranal_init_msg(kra_msg_t *msg, int type)
143 {
144         msg->ram_magic = RANAL_MSG_MAGIC;
145         msg->ram_version = RANAL_MSG_VERSION;
146         msg->ram_type = type;
147         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
148         /* ram_connstamp gets set when FMA is sent */
149 }
150
151 kra_tx_t *
152 kranal_new_tx_msg (int may_block, int type)
153 {
154         kra_tx_t *tx = kranal_get_idle_tx(may_block);
155
156         if (tx == NULL)
157                 return NULL;
158
159         kranal_init_msg(&tx->tx_msg, type);
160         return tx;
161 }
162
163 int
164 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
165                                int offset, int nob)
166
167 {
168         /* For now this is almost identical to kranal_setup_virt_buffer, but we
169          * could "flatten" the payload into a single contiguous buffer ready
170          * for sending direct over an FMA if we ever needed to. */
171
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173         LASSERT (nob >= 0);
174
175         if (nob == 0) {
176                 tx->tx_buffer = NULL;
177         } else {
178                 LASSERT (niov > 0);
179
180                 while (offset >= iov->iov_len) {
181                         offset -= iov->iov_len;
182                         niov--;
183                         iov++;
184                         LASSERT (niov > 0);
185                 }
186
187                 if (nob > iov->iov_len - offset) {
188                         CERROR("Can't handle multiple vaddr fragments\n");
189                         return -EMSGSIZE;
190                 }
191
192                 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
193         }
194
195         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
196         tx->tx_nob = nob;
197         return 0;
198 }
199
200 int
201 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
202                           int offset, int nob)
203
204 {
205         LASSERT (nob > 0);
206         LASSERT (niov > 0);
207         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
208
209         while (offset >= iov->iov_len) {
210                 offset -= iov->iov_len;
211                 niov--;
212                 iov++;
213                 LASSERT (niov > 0);
214         }
215
216         if (nob > iov->iov_len - offset) {
217                 CERROR("Can't handle multiple vaddr fragments\n");
218                 return -EMSGSIZE;
219         }
220
221         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
222         tx->tx_nob = nob;
223         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
224         return 0;
225 }
226
227 int
228 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
229                           int offset, int nob)
230 {
231         RAP_PHYS_REGION *phys = tx->tx_phys;
232         int              resid;
233
234         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
235
236         LASSERT (nob > 0);
237         LASSERT (nkiov > 0);
238         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
239
240         while (offset >= kiov->kiov_len) {
241                 offset -= kiov->kiov_len;
242                 nkiov--;
243                 kiov++;
244                 LASSERT (nkiov > 0);
245         }
246
247         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
248         tx->tx_nob = nob;
249         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
250
251         phys->Address = kranal_page2phys(kiov->kiov_page);
252         phys++;
253
254         resid = nob - (kiov->kiov_len - offset);
255         while (resid > 0) {
256                 kiov++;
257                 nkiov--;
258                 LASSERT (nkiov > 0);
259
260                 if (kiov->kiov_offset != 0 ||
261                     ((resid > PAGE_SIZE) &&
262                      kiov->kiov_len < PAGE_SIZE)) {
263                         /* Can't have gaps */
264                         CERROR("Can't make payload contiguous in I/O VM:"
265                                "page %d, offset %d, len %d \n",
266                                (int)(phys - tx->tx_phys),
267                                kiov->kiov_offset, kiov->kiov_len);
268                         return -EINVAL;
269                 }
270
271                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
272                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
273                         return -EMSGSIZE;
274                 }
275
276                 phys->Address = kranal_page2phys(kiov->kiov_page);
277                 phys++;
278
279                 resid -= PAGE_SIZE;
280         }
281
282         tx->tx_phys_npages = phys - tx->tx_phys;
283         return 0;
284 }
285
286 static inline int
287 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
288                           struct iovec *iov, ptl_kiov_t *kiov,
289                           int offset, int nob)
290 {
291         LASSERT ((iov == NULL) != (kiov == NULL));
292
293         if (kiov != NULL)
294                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
295
296         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
297 }
298
299 int
300 kranal_map_buffer (kra_tx_t *tx)
301 {
302         kra_conn_t     *conn = tx->tx_conn;
303         kra_device_t   *dev = conn->rac_device;
304         RAP_RETURN      rrc;
305
306         LASSERT (current == dev->rad_scheduler);
307
308         switch (tx->tx_buftype) {
309         default:
310                 LBUG();
311
312         case RANAL_BUF_NONE:
313         case RANAL_BUF_IMMEDIATE:
314         case RANAL_BUF_PHYS_MAPPED:
315         case RANAL_BUF_VIRT_MAPPED:
316                 return 0;
317
318         case RANAL_BUF_PHYS_UNMAPPED:
319                 rrc = RapkRegisterPhys(dev->rad_handle,
320                                        tx->tx_phys, tx->tx_phys_npages,
321                                        &tx->tx_map_key);
322                 if (rrc != RAP_SUCCESS) {
323                         CERROR ("Can't map %d pages: dev %d "
324                                 "phys %u pp %u, virt %u nob %lu\n",
325                                 tx->tx_phys_npages, dev->rad_id, 
326                                 dev->rad_nphysmap, dev->rad_nppphysmap,
327                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
328                         return -ENOMEM; /* assume insufficient resources */
329                 }
330
331                 dev->rad_nphysmap++;
332                 dev->rad_nppphysmap += tx->tx_phys_npages;
333
334                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
335                 return 0;
336
337         case RANAL_BUF_VIRT_UNMAPPED:
338                 rrc = RapkRegisterMemory(dev->rad_handle,
339                                          tx->tx_buffer, tx->tx_nob,
340                                          &tx->tx_map_key);
341                 if (rrc != RAP_SUCCESS) {
342                         CERROR ("Can't map %d bytes: dev %d "
343                                 "phys %u pp %u, virt %u nob %lu\n",
344                                 tx->tx_nob, dev->rad_id, 
345                                 dev->rad_nphysmap, dev->rad_nppphysmap,
346                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
347                         return -ENOMEM; /* assume insufficient resources */
348                 }
349
350                 dev->rad_nvirtmap++;
351                 dev->rad_nobvirtmap += tx->tx_nob;
352
353                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
354                 return 0;
355         }
356 }
357
358 void
359 kranal_unmap_buffer (kra_tx_t *tx)
360 {
361         kra_device_t   *dev;
362         RAP_RETURN      rrc;
363
364         switch (tx->tx_buftype) {
365         default:
366                 LBUG();
367
368         case RANAL_BUF_NONE:
369         case RANAL_BUF_IMMEDIATE:
370         case RANAL_BUF_PHYS_UNMAPPED:
371         case RANAL_BUF_VIRT_UNMAPPED:
372                 break;
373
374         case RANAL_BUF_PHYS_MAPPED:
375                 LASSERT (tx->tx_conn != NULL);
376                 dev = tx->tx_conn->rac_device;
377                 LASSERT (current == dev->rad_scheduler);
378                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
379                                            &tx->tx_map_key);
380                 LASSERT (rrc == RAP_SUCCESS);
381
382                 dev->rad_nphysmap--;
383                 dev->rad_nppphysmap -= tx->tx_phys_npages;
384
385                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
386                 break;
387
388         case RANAL_BUF_VIRT_MAPPED:
389                 LASSERT (tx->tx_conn != NULL);
390                 dev = tx->tx_conn->rac_device;
391                 LASSERT (current == dev->rad_scheduler);
392                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
393                                            &tx->tx_map_key);
394                 LASSERT (rrc == RAP_SUCCESS);
395
396                 dev->rad_nvirtmap--;
397                 dev->rad_nobvirtmap -= tx->tx_nob;
398
399                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
400                 break;
401         }
402 }
403
404 void
405 kranal_tx_done (kra_tx_t *tx, int completion)
406 {
407         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
408         unsigned long    flags;
409         int              i;
410
411         LASSERT (!in_interrupt());
412
413         kranal_unmap_buffer(tx);
414
415         for (i = 0; i < 2; i++) {
416                 /* tx may have up to 2 libmsgs to finalise */
417                 if (tx->tx_libmsg[i] == NULL)
418                         continue;
419
420                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
421                 tx->tx_libmsg[i] = NULL;
422         }
423
424         tx->tx_buftype = RANAL_BUF_NONE;
425         tx->tx_msg.ram_type = RANAL_MSG_NONE;
426         tx->tx_conn = NULL;
427
428         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
429
430         if (tx->tx_isnblk) {
431                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
432         } else {
433                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
434                 wake_up(&kranal_data.kra_idle_tx_waitq);
435         }
436
437         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
438 }
439
440 kra_conn_t *
441 kranal_find_conn_locked (kra_peer_t *peer)
442 {
443         struct list_head *tmp;
444
445         /* just return the first connection */
446         list_for_each (tmp, &peer->rap_conns) {
447                 return list_entry(tmp, kra_conn_t, rac_list);
448         }
449
450         return NULL;
451 }
452
453 void
454 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
455 {
456         unsigned long    flags;
457
458         tx->tx_conn = conn;
459
460         spin_lock_irqsave(&conn->rac_lock, flags);
461         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
462         tx->tx_qtime = jiffies;
463         spin_unlock_irqrestore(&conn->rac_lock, flags);
464
465         kranal_schedule_conn(conn);
466 }
467
468 void
469 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
470 {
471         unsigned long    flags;
472         kra_peer_t      *peer;
473         kra_conn_t      *conn;
474         unsigned long    now;
475         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
476
477         /* If I get here, I've committed to send, so I complete the tx with
478          * failure on any problems */
479
480         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
481
482         read_lock(g_lock);
483
484         peer = kranal_find_peer_locked(nid);
485         if (peer == NULL) {
486                 read_unlock(g_lock);
487                 kranal_tx_done(tx, -EHOSTUNREACH);
488                 return;
489         }
490
491         conn = kranal_find_conn_locked(peer);
492         if (conn != NULL) {
493                 kranal_post_fma(conn, tx);
494                 read_unlock(g_lock);
495                 return;
496         }
497
498         /* Making one or more connections; I'll need a write lock... */
499         read_unlock(g_lock);
500         write_lock_irqsave(g_lock, flags);
501
502         peer = kranal_find_peer_locked(nid);
503         if (peer == NULL) {
504                 write_unlock_irqrestore(g_lock, flags);
505                 kranal_tx_done(tx, -EHOSTUNREACH);
506                 return;
507         }
508
509         conn = kranal_find_conn_locked(peer);
510         if (conn != NULL) {
511                 /* Connection exists; queue message on it */
512                 kranal_post_fma(conn, tx);
513                 write_unlock_irqrestore(g_lock, flags);
514                 return;
515         }
516
517         LASSERT (peer->rap_persistence > 0);
518
519         if (!peer->rap_connecting) {
520                 LASSERT (list_empty(&peer->rap_tx_queue));
521
522                 now = CURRENT_SECONDS;
523                 if (now < peer->rap_reconnect_time) {
524                         write_unlock_irqrestore(g_lock, flags);
525                         kranal_tx_done(tx, -EHOSTUNREACH);
526                         return;
527                 }
528
529                 peer->rap_connecting = 1;
530                 kranal_peer_addref(peer); /* extra ref for connd */
531
532                 spin_lock(&kranal_data.kra_connd_lock);
533
534                 list_add_tail(&peer->rap_connd_list,
535                               &kranal_data.kra_connd_peers);
536                 wake_up(&kranal_data.kra_connd_waitq);
537
538                 spin_unlock(&kranal_data.kra_connd_lock);
539         }
540
541         /* A connection is being established; queue the message... */
542         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
543
544         write_unlock_irqrestore(g_lock, flags);
545 }
546
547 void
548 kranal_rdma(kra_tx_t *tx, int type,
549             kra_rdma_desc_t *sink, int nob, __u64 cookie)
550 {
551         kra_conn_t   *conn = tx->tx_conn;
552         RAP_RETURN    rrc;
553         unsigned long flags;
554
555         LASSERT (kranal_tx_mapped(tx));
556         LASSERT (nob <= sink->rard_nob);
557         LASSERT (nob <= tx->tx_nob);
558
559         /* No actual race with scheduler sending CLOSE (I'm she!) */
560         LASSERT (current == conn->rac_device->rad_scheduler);
561
562         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
563         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
564         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
565         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
566         tx->tx_rdma_desc.DstKey = sink->rard_key;
567         tx->tx_rdma_desc.Length = nob;
568         tx->tx_rdma_desc.AppPtr = tx;
569
570         /* prep final completion message */
571         kranal_init_msg(&tx->tx_msg, type);
572         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
573
574         if (nob == 0) { /* Immediate completion */
575                 kranal_post_fma(conn, tx);
576                 return;
577         }
578
579         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
580
581         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
582         LASSERT (rrc == RAP_SUCCESS);
583
584         spin_lock_irqsave(&conn->rac_lock, flags);
585         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
586         tx->tx_qtime = jiffies;
587         spin_unlock_irqrestore(&conn->rac_lock, flags);
588 }
589
590 int
591 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
592 {
593         __u32      nob_received = nob;
594         RAP_RETURN rrc;
595
596         LASSERT (conn->rac_rxmsg != NULL);
597         CDEBUG(D_NET, "Consuming %p\n", conn);
598
599         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
600                              &nob_received, sizeof(kra_msg_t));
601         LASSERT (rrc == RAP_SUCCESS);
602
603         conn->rac_rxmsg = NULL;
604
605         if (nob_received < nob) {
606                 CWARN("Incomplete immediate msg from "LPX64
607                       ": expected %d, got %d\n",
608                       conn->rac_peer->rap_nid, nob, nob_received);
609                 return -EPROTO;
610         }
611
612         return 0;
613 }
614
615 ptl_err_t
616 kranal_do_send (lib_nal_t    *nal,
617                 void         *private,
618                 lib_msg_t    *libmsg,
619                 ptl_hdr_t    *hdr,
620                 int           type,
621                 ptl_nid_t     nid,
622                 ptl_pid_t     pid,
623                 unsigned int  niov,
624                 struct iovec *iov,
625                 ptl_kiov_t   *kiov,
626                 int           offset,
627                 int           nob)
628 {
629         kra_conn_t *conn;
630         kra_tx_t   *tx;
631         int         rc;
632
633         /* NB 'private' is different depending on what we're sending.... */
634
635         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
636                nob, niov, nid, pid);
637
638         LASSERT (nob == 0 || niov > 0);
639         LASSERT (niov <= PTL_MD_MAX_IOV);
640
641         LASSERT (!in_interrupt());
642         /* payload is either all vaddrs or all pages */
643         LASSERT (!(kiov != NULL && iov != NULL));
644
645         switch(type) {
646         default:
647                 LBUG();
648
649         case PTL_MSG_REPLY: {
650                 /* reply's 'private' is the conn that received the GET_REQ */
651                 conn = private;
652                 LASSERT (conn->rac_rxmsg != NULL);
653
654                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
655                         if (nob > RANAL_FMA_MAX_DATA) {
656                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
657                                        nob, nid);
658                                 return PTL_FAIL;
659                         }
660                         break;                  /* RDMA not expected */
661                 }
662
663                 /* Incoming message consistent with RDMA? */
664                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
665                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
666                                nid, conn->rac_rxmsg->ram_type);
667                         return PTL_FAIL;
668                 }
669
670                 tx = kranal_get_idle_tx(0);
671                 if (tx == NULL)
672                         return PTL_FAIL;
673
674                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
675                 if (rc != 0) {
676                         kranal_tx_done(tx, rc);
677                         return PTL_FAIL;
678                 }
679
680                 tx->tx_conn = conn;
681                 tx->tx_libmsg[0] = libmsg;
682
683                 rc = kranal_map_buffer(tx);
684                 if (rc != 0) {
685                         kranal_tx_done(tx, rc);
686                         return PTL_FAIL;
687                 }
688
689                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
690                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
691                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
692
693                 /* flag matched by consuming rx message */
694                 kranal_consume_rxmsg(conn, NULL, 0);
695                 return PTL_OK;
696         }
697
698         case PTL_MSG_GET:
699                 LASSERT (niov == 0);
700                 LASSERT (nob == 0);
701                 /* We have to consider the eventual sink buffer rather than any
702                  * payload passed here (there isn't any, and strictly, looking
703                  * inside libmsg is a layering violation).  We send a simple
704                  * IMMEDIATE GET if the sink buffer is mapped already and small
705                  * enough for FMA */
706
707                 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
708                     libmsg->md->length <= RANAL_FMA_MAX_DATA &&
709                     libmsg->md->length <= kranal_tunables.kra_max_immediate)
710                         break;
711
712                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
713                 if (tx == NULL)
714                         return PTL_NO_SPACE;
715
716                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
717                         rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
718                                                       libmsg->md->md_iov.iov,
719                                                       0, libmsg->md->length);
720                 else
721                         rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
722                                                       libmsg->md->md_iov.kiov,
723                                                       0, libmsg->md->length);
724                 if (rc != 0) {
725                         kranal_tx_done(tx, rc);
726                         return PTL_FAIL;
727                 }
728
729                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
730                 if (tx->tx_libmsg[1] == NULL) {
731                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
732                         kranal_tx_done(tx, rc);
733                         return PTL_FAIL;
734                 }
735
736                 tx->tx_libmsg[0] = libmsg;
737                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
738                 /* rest of tx_msg is setup just before it is sent */
739                 kranal_launch_tx(tx, nid);
740                 return PTL_OK;
741
742         case PTL_MSG_ACK:
743                 LASSERT (nob == 0);
744                 break;
745
746         case PTL_MSG_PUT:
747                 if (kiov == NULL &&             /* not paged */
748                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
749                     nob <= kranal_tunables.kra_max_immediate)
750                         break;                  /* send IMMEDIATE */
751
752                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
753                 if (tx == NULL)
754                         return PTL_NO_SPACE;
755
756                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
757                 if (rc != 0) {
758                         kranal_tx_done(tx, rc);
759                         return PTL_FAIL;
760                 }
761
762                 tx->tx_libmsg[0] = libmsg;
763                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
764                 /* rest of tx_msg is setup just before it is sent */
765                 kranal_launch_tx(tx, nid);
766                 return PTL_OK;
767         }
768
769         LASSERT (kiov == NULL);
770         LASSERT (nob <= RANAL_FMA_MAX_DATA);
771
772         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
773                                  type == PTL_MSG_REPLY ||
774                                  in_interrupt()),
775                                RANAL_MSG_IMMEDIATE);
776         if (tx == NULL)
777                 return PTL_NO_SPACE;
778
779         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
780         if (rc != 0) {
781                 kranal_tx_done(tx, rc);
782                 return PTL_FAIL;
783         }
784
785         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
786         tx->tx_libmsg[0] = libmsg;
787         kranal_launch_tx(tx, nid);
788         return PTL_OK;
789 }
790
791 ptl_err_t
792 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
793              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
794              unsigned int niov, struct iovec *iov,
795              size_t offset, size_t len)
796 {
797         return kranal_do_send(nal, private, cookie,
798                               hdr, type, nid, pid,
799                               niov, iov, NULL,
800                               offset, len);
801 }
802
803 ptl_err_t
804 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
805                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
806                    unsigned int niov, ptl_kiov_t *kiov,
807                    size_t offset, size_t len)
808 {
809         return kranal_do_send(nal, private, cookie,
810                               hdr, type, nid, pid,
811                               niov, NULL, kiov,
812                               offset, len);
813 }
814
815 ptl_err_t
816 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
817                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
818                 int offset, int mlen, int rlen)
819 {
820         kra_conn_t  *conn = private;
821         kra_msg_t   *rxmsg = conn->rac_rxmsg;
822         kra_tx_t    *tx;
823         void        *buffer;
824         int          rc;
825
826         LASSERT (mlen <= rlen);
827         LASSERT (!in_interrupt());
828         /* Either all pages or all vaddrs */
829         LASSERT (!(kiov != NULL && iov != NULL));
830
831         CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
832
833         if (libmsg == NULL) {
834                 /* GET or ACK or portals is discarding */
835                 LASSERT (mlen == 0);
836                 lib_finalize(nal, NULL, libmsg, PTL_OK);
837                 return PTL_OK;
838         }
839
840         switch(rxmsg->ram_type) {
841         default:
842                 LBUG();
843                 return PTL_FAIL;
844
845         case RANAL_MSG_IMMEDIATE:
846                 if (mlen == 0) {
847                         buffer = NULL;
848                 } else if (kiov != NULL) {
849                         CERROR("Can't recv immediate into paged buffer\n");
850                         return PTL_FAIL;
851                 } else {
852                         LASSERT (niov > 0);
853                         while (offset >= iov->iov_len) {
854                                 offset -= iov->iov_len;
855                                 iov++;
856                                 niov--;
857                                 LASSERT (niov > 0);
858                         }
859                         if (mlen > iov->iov_len - offset) {
860                                 CERROR("Can't handle immediate frags\n");
861                                 return PTL_FAIL;
862                         }
863                         buffer = ((char *)iov->iov_base) + offset;
864                 }
865                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
866                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
867                 return PTL_OK;
868
869         case RANAL_MSG_PUT_REQ:
870                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
871                 if (tx == NULL)
872                         return PTL_NO_SPACE;
873
874                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
875                 if (rc != 0) {
876                         kranal_tx_done(tx, rc);
877                         return PTL_FAIL;
878                 }
879
880                 tx->tx_conn = conn;
881                 rc = kranal_map_buffer(tx);
882                 if (rc != 0) {
883                         kranal_tx_done(tx, rc);
884                         return PTL_FAIL;
885                 }
886
887                 tx->tx_msg.ram_u.putack.rapam_src_cookie =
888                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
889                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
890                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
891                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
892                         (__u64)((unsigned long)tx->tx_buffer);
893                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
894
895                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
896
897                 kranal_post_fma(conn, tx);
898
899                 /* flag matched by consuming rx message */
900                 kranal_consume_rxmsg(conn, NULL, 0);
901                 return PTL_OK;
902         }
903 }
904
905 ptl_err_t
906 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
907              unsigned int niov, struct iovec *iov,
908              size_t offset, size_t mlen, size_t rlen)
909 {
910         return kranal_do_recv(nal, private, msg, niov, iov, NULL,
911                               offset, mlen, rlen);
912 }
913
914 ptl_err_t
915 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
916                    unsigned int niov, ptl_kiov_t *kiov,
917                    size_t offset, size_t mlen, size_t rlen)
918 {
919         return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
920                               offset, mlen, rlen);
921 }
922
923 int
924 kranal_thread_start (int(*fn)(void *arg), void *arg)
925 {
926         long    pid = kernel_thread(fn, arg, 0);
927
928         if (pid < 0)
929                 return(int)pid;
930
931         atomic_inc(&kranal_data.kra_nthreads);
932         return 0;
933 }
934
935 void
936 kranal_thread_fini (void)
937 {
938         atomic_dec(&kranal_data.kra_nthreads);
939 }
940
941 int
942 kranal_check_conn_timeouts (kra_conn_t *conn)
943 {
944         kra_tx_t          *tx;
945         struct list_head  *ttmp;
946         unsigned long      flags;
947         long               timeout;
948         unsigned long      now = jiffies;
949
950         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
951                  conn->rac_state == RANAL_CONN_CLOSING);
952
953         if (!conn->rac_close_sent &&
954             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
955                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
956                 CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
957                        conn, conn->rac_peer->rap_nid);
958                 kranal_schedule_conn(conn);
959         }
960
961         timeout = conn->rac_timeout * HZ;
962
963         if (!conn->rac_close_recvd &&
964             time_after_eq(now, conn->rac_last_rx + timeout)) {
965                 CERROR("%s received from "LPX64" within %lu seconds\n",
966                        (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
967                        "Nothing" : "CLOSE not",
968                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
969                 return -ETIMEDOUT;
970         }
971
972         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
973                 return 0;
974
975         /* Check the conn's queues are moving.  These are "belt+braces" checks,
976          * in case of hardware/software errors that make this conn seem
977          * responsive even though it isn't progressing its message queues. */
978
979         spin_lock_irqsave(&conn->rac_lock, flags);
980
981         list_for_each (ttmp, &conn->rac_fmaq) {
982                 tx = list_entry(ttmp, kra_tx_t, tx_list);
983
984                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
985                         spin_unlock_irqrestore(&conn->rac_lock, flags);
986                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
987                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
988                         return -ETIMEDOUT;
989                 }
990         }
991
992         list_for_each (ttmp, &conn->rac_rdmaq) {
993                 tx = list_entry(ttmp, kra_tx_t, tx_list);
994
995                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
996                         spin_unlock_irqrestore(&conn->rac_lock, flags);
997                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
998                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
999                         return -ETIMEDOUT;
1000                 }
1001         }
1002
1003         list_for_each (ttmp, &conn->rac_replyq) {
1004                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1005
1006                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
1007                         spin_unlock_irqrestore(&conn->rac_lock, flags);
1008                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
1009                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
1010                         return -ETIMEDOUT;
1011                 }
1012         }
1013
1014         spin_unlock_irqrestore(&conn->rac_lock, flags);
1015         return 0;
1016 }
1017
1018 void
1019 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
1020 {
1021         struct list_head  *conns = &kranal_data.kra_conns[idx];
1022         struct list_head  *ctmp;
1023         kra_conn_t        *conn;
1024         unsigned long      flags;
1025         int                rc;
1026
1027  again:
1028         /* NB. We expect to check all the conns and not find any problems, so
1029          * we just use a shared lock while we take a look... */
1030         read_lock(&kranal_data.kra_global_lock);
1031
1032         list_for_each (ctmp, conns) {
1033                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
1034
1035                 if (conn->rac_timeout < *min_timeoutp )
1036                         *min_timeoutp = conn->rac_timeout;
1037                 if (conn->rac_keepalive < *min_timeoutp )
1038                         *min_timeoutp = conn->rac_keepalive;
1039
1040                 rc = kranal_check_conn_timeouts(conn);
1041                 if (rc == 0)
1042                         continue;
1043
1044                 kranal_conn_addref(conn);
1045                 read_unlock(&kranal_data.kra_global_lock);
1046
1047                 CERROR("Conn to "LPX64", cqid %d timed out\n",
1048                        conn->rac_peer->rap_nid, conn->rac_cqid);
1049
1050                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1051
1052                 switch (conn->rac_state) {
1053                 default:
1054                         LBUG();
1055
1056                 case RANAL_CONN_ESTABLISHED:
1057                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1058                         break;
1059
1060                 case RANAL_CONN_CLOSING:
1061                         kranal_terminate_conn_locked(conn);
1062                         break;
1063                 }
1064
1065                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1066
1067                 kranal_conn_decref(conn);
1068
1069                 /* start again now I've dropped the lock */
1070                 goto again;
1071         }
1072
1073         read_unlock(&kranal_data.kra_global_lock);
1074 }
1075
1076 int
1077 kranal_connd (void *arg)
1078 {
1079         long               id = (long)arg;
1080         char               name[16];
1081         wait_queue_t       wait;
1082         unsigned long      flags;
1083         kra_peer_t        *peer;
1084         kra_acceptsock_t  *ras;
1085         int                did_something;
1086
1087         snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1088         kportal_daemonize(name);
1089         kportal_blockallsigs();
1090
1091         init_waitqueue_entry(&wait, current);
1092
1093         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1094
1095         while (!kranal_data.kra_shutdown) {
1096                 did_something = 0;
1097
1098                 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1099                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
1100                                          kra_acceptsock_t, ras_list);
1101                         list_del(&ras->ras_list);
1102
1103                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1104
1105                         CDEBUG(D_NET,"About to handshake someone\n");
1106
1107                         kranal_conn_handshake(ras->ras_sock, NULL);
1108                         kranal_free_acceptsock(ras);
1109
1110                         CDEBUG(D_NET,"Finished handshaking someone\n");
1111
1112                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1113                         did_something = 1;
1114                 }
1115
1116                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1117                         peer = list_entry(kranal_data.kra_connd_peers.next,
1118                                           kra_peer_t, rap_connd_list);
1119
1120                         list_del_init(&peer->rap_connd_list);
1121                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1122
1123                         kranal_connect(peer);
1124                         kranal_peer_decref(peer);
1125
1126                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1127                         did_something = 1;
1128                 }
1129
1130                 if (did_something)
1131                         continue;
1132
1133                 set_current_state(TASK_INTERRUPTIBLE);
1134                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1135
1136                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1137
1138                 schedule ();
1139
1140                 set_current_state(TASK_RUNNING);
1141                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1142
1143                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1144         }
1145
1146         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1147
1148         kranal_thread_fini();
1149         return 0;
1150 }
1151
1152 void
1153 kranal_update_reaper_timeout(long timeout)
1154 {
1155         unsigned long   flags;
1156
1157         LASSERT (timeout > 0);
1158
1159         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1160
1161         if (timeout < kranal_data.kra_new_min_timeout)
1162                 kranal_data.kra_new_min_timeout = timeout;
1163
1164         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1165 }
1166
1167 int
1168 kranal_reaper (void *arg)
1169 {
1170         wait_queue_t       wait;
1171         unsigned long      flags;
1172         long               timeout;
1173         int                i;
1174         int                conn_entries = kranal_data.kra_conn_hash_size;
1175         int                conn_index = 0;
1176         int                base_index = conn_entries - 1;
1177         unsigned long      next_check_time = jiffies;
1178         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1179         long               current_min_timeout = 1;
1180
1181         kportal_daemonize("kranal_reaper");
1182         kportal_blockallsigs();
1183
1184         init_waitqueue_entry(&wait, current);
1185
1186         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1187
1188         while (!kranal_data.kra_shutdown) {
1189                 /* I wake up every 'p' seconds to check for timeouts on some
1190                  * more peers.  I try to check every connection 'n' times
1191                  * within the global minimum of all keepalive and timeout
1192                  * intervals, to ensure I attend to every connection within
1193                  * (n+1)/n times its timeout intervals. */
1194                 const int     p = 1;
1195                 const int     n = 3;
1196                 unsigned long min_timeout;
1197                 int           chunk;
1198
1199                 /* careful with the jiffy wrap... */
1200                 timeout = (long)(next_check_time - jiffies);
1201                 if (timeout > 0) {
1202                         set_current_state(TASK_INTERRUPTIBLE);
1203                         add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1204
1205                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1206
1207                         schedule_timeout(timeout);
1208
1209                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1210
1211                         set_current_state(TASK_RUNNING);
1212                         remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1213                         continue;
1214                 }
1215
1216                 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1217                         /* new min timeout set: restart min timeout scan */
1218                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1219                         base_index = conn_index - 1;
1220                         if (base_index < 0)
1221                                 base_index = conn_entries - 1;
1222
1223                         if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1224                                 current_min_timeout = kranal_data.kra_new_min_timeout;
1225                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1226                                        current_min_timeout);
1227                         }
1228
1229                         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1230                 }
1231                 min_timeout = current_min_timeout;
1232
1233                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1234
1235                 LASSERT (min_timeout > 0);
1236
1237                 /* Compute how many table entries to check now so I get round
1238                  * the whole table fast enough given that I do this at fixed
1239                  * intervals of 'p' seconds) */
1240                 chunk = conn_entries;
1241                 if (min_timeout > n * p)
1242                         chunk = (chunk * n * p) / min_timeout;
1243                 if (chunk == 0)
1244                         chunk = 1;
1245
1246                 for (i = 0; i < chunk; i++) {
1247                         kranal_reaper_check(conn_index,
1248                                             &next_min_timeout);
1249                         conn_index = (conn_index + 1) % conn_entries;
1250                 }
1251
1252                 next_check_time += p * HZ;
1253
1254                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1255
1256                 if (((conn_index - chunk <= base_index &&
1257                       base_index < conn_index) ||
1258                      (conn_index - conn_entries - chunk <= base_index &&
1259                       base_index < conn_index - conn_entries))) {
1260
1261                         /* Scanned all conns: set current_min_timeout... */
1262                         if (current_min_timeout != next_min_timeout) {
1263                                 current_min_timeout = next_min_timeout;
1264                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1265                                        current_min_timeout);
1266                         }
1267
1268                         /* ...and restart min timeout scan */
1269                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1270                         base_index = conn_index - 1;
1271                         if (base_index < 0)
1272                                 base_index = conn_entries - 1;
1273                 }
1274         }
1275
1276         kranal_thread_fini();
1277         return 0;
1278 }
1279
1280 void
1281 kranal_check_rdma_cq (kra_device_t *dev)
1282 {
1283         kra_conn_t          *conn;
1284         kra_tx_t            *tx;
1285         RAP_RETURN           rrc;
1286         unsigned long        flags;
1287         RAP_RDMA_DESCRIPTOR *desc;
1288         __u32                cqid;
1289         __u32                event_type;
1290
1291         for (;;) {
1292                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1293                 if (rrc == RAP_NOT_DONE) {
1294                         CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1295                         return;
1296                 }
1297
1298                 LASSERT (rrc == RAP_SUCCESS);
1299                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1300
1301                 read_lock(&kranal_data.kra_global_lock);
1302
1303                 conn = kranal_cqid2conn_locked(cqid);
1304                 if (conn == NULL) {
1305                         /* Conn was destroyed? */
1306                         CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1307                         read_unlock(&kranal_data.kra_global_lock);
1308                         continue;
1309                 }
1310
1311                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1312                 LASSERT (rrc == RAP_SUCCESS);
1313
1314                 CDEBUG(D_NET, "Completed %p\n",
1315                        list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1316
1317                 spin_lock_irqsave(&conn->rac_lock, flags);
1318
1319                 LASSERT (!list_empty(&conn->rac_rdmaq));
1320                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1321                 list_del(&tx->tx_list);
1322
1323                 LASSERT(desc->AppPtr == (void *)tx);
1324                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1325                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1326
1327                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1328                 tx->tx_qtime = jiffies;
1329
1330                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1331
1332                 /* Get conn's fmaq processed, now I've just put something
1333                  * there */
1334                 kranal_schedule_conn(conn);
1335
1336                 read_unlock(&kranal_data.kra_global_lock);
1337         }
1338 }
1339
1340 void
1341 kranal_check_fma_cq (kra_device_t *dev)
1342 {
1343         kra_conn_t         *conn;
1344         RAP_RETURN          rrc;
1345         __u32               cqid;
1346         __u32               event_type;
1347         struct list_head   *conns;
1348         struct list_head   *tmp;
1349         int                 i;
1350
1351         for (;;) {
1352                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1353                 if (rrc == RAP_NOT_DONE) {
1354                         CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1355                         return;
1356                 }
1357
1358                 LASSERT (rrc == RAP_SUCCESS);
1359
1360                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1361
1362                         read_lock(&kranal_data.kra_global_lock);
1363
1364                         conn = kranal_cqid2conn_locked(cqid);
1365                         if (conn == NULL) {
1366                                 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1367                                        cqid);
1368                         } else {
1369                                 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1370                                        conn, cqid);
1371                                 kranal_schedule_conn(conn);
1372                         }
1373
1374                         read_unlock(&kranal_data.kra_global_lock);
1375                         continue;
1376                 }
1377
1378                 /* FMA CQ has overflowed: check ALL conns */
1379                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1380
1381                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1382
1383                         read_lock(&kranal_data.kra_global_lock);
1384
1385                         conns = &kranal_data.kra_conns[i];
1386
1387                         list_for_each (tmp, conns) {
1388                                 conn = list_entry(tmp, kra_conn_t,
1389                                                   rac_hashlist);
1390
1391                                 if (conn->rac_device == dev)
1392                                         kranal_schedule_conn(conn);
1393                         }
1394
1395                         /* don't block write lockers for too long... */
1396                         read_unlock(&kranal_data.kra_global_lock);
1397                 }
1398         }
1399 }
1400
1401 int
1402 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1403                void *immediate, int immediatenob)
1404 {
1405         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1406         RAP_RETURN rrc;
1407
1408         CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1409                conn, msg, msg->ram_type, sync ? "(sync)" : "",
1410                immediate, immediatenob);
1411
1412         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1413         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1414                  immediatenob <= RANAL_FMA_MAX_DATA :
1415                  immediatenob == 0);
1416
1417         msg->ram_connstamp = conn->rac_my_connstamp;
1418         msg->ram_seq = conn->rac_tx_seq;
1419
1420         if (sync)
1421                 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1422                                       immediate, immediatenob,
1423                                       msg, sizeof(*msg));
1424         else
1425                 rrc = RapkFmaSend(conn->rac_rihandle,
1426                                   immediate, immediatenob,
1427                                   msg, sizeof(*msg));
1428
1429         switch (rrc) {
1430         default:
1431                 LBUG();
1432
1433         case RAP_SUCCESS:
1434                 conn->rac_last_tx = jiffies;
1435                 conn->rac_tx_seq++;
1436                 return 0;
1437
1438         case RAP_NOT_DONE:
1439                 if (time_after_eq(jiffies,
1440                                   conn->rac_last_tx + conn->rac_keepalive*HZ))
1441                         CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1442                                msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
1443                 return -EAGAIN;
1444         }
1445 }
1446
1447 void
1448 kranal_process_fmaq (kra_conn_t *conn)
1449 {
1450         unsigned long flags;
1451         int           more_to_do;
1452         kra_tx_t     *tx;
1453         int           rc;
1454         int           expect_reply;
1455
1456         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1457          *       However I will be rescheduled by an FMA completion event
1458          *       when I eventually get some.
1459          * NB 2. Sampling rac_state here races with setting it elsewhere.
1460          *       But it doesn't matter if I try to send a "real" message just
1461          *       as I start closing because I'll get scheduled to send the
1462          *       close anyway. */
1463
1464         /* Not racing with incoming message processing! */
1465         LASSERT (current == conn->rac_device->rad_scheduler);
1466
1467         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1468                 if (!list_empty(&conn->rac_rdmaq)) {
1469                         /* RDMAs in progress */
1470                         LASSERT (!conn->rac_close_sent);
1471
1472                         if (time_after_eq(jiffies,
1473                                           conn->rac_last_tx +
1474                                           conn->rac_keepalive * HZ)) {
1475                                 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1476                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1477                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1478                         }
1479                         return;
1480                 }
1481
1482                 if (conn->rac_close_sent)
1483                         return;
1484
1485                 CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
1486                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1487                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1488                 if (rc != 0)
1489                         return;
1490
1491                 conn->rac_close_sent = 1;
1492                 if (!conn->rac_close_recvd)
1493                         return;
1494
1495                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1496
1497                 if (conn->rac_state == RANAL_CONN_CLOSING)
1498                         kranal_terminate_conn_locked(conn);
1499
1500                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1501                 return;
1502         }
1503
1504         spin_lock_irqsave(&conn->rac_lock, flags);
1505
1506         if (list_empty(&conn->rac_fmaq)) {
1507
1508                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1509
1510                 if (time_after_eq(jiffies,
1511                                   conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1512                         CDEBUG(D_NET, "sending NOOP -> "LPX64" (%p idle %lu(%ld))\n",
1513                                conn->rac_peer->rap_nid, conn,
1514                                (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
1515                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1516                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1517                 }
1518                 return;
1519         }
1520
1521         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1522         list_del(&tx->tx_list);
1523         more_to_do = !list_empty(&conn->rac_fmaq);
1524
1525         spin_unlock_irqrestore(&conn->rac_lock, flags);
1526
1527         expect_reply = 0;
1528         CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1529                tx, tx->tx_msg.ram_type, tx->tx_cookie);
1530         switch (tx->tx_msg.ram_type) {
1531         default:
1532                 LBUG();
1533
1534         case RANAL_MSG_IMMEDIATE:
1535                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1536                                     tx->tx_buffer, tx->tx_nob);
1537                 break;
1538
1539         case RANAL_MSG_PUT_NAK:
1540         case RANAL_MSG_PUT_DONE:
1541         case RANAL_MSG_GET_NAK:
1542         case RANAL_MSG_GET_DONE:
1543                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1544                 break;
1545
1546         case RANAL_MSG_PUT_REQ:
1547                 rc = kranal_map_buffer(tx);
1548                 LASSERT (rc != -EAGAIN);
1549                 if (rc != 0)
1550                         break;
1551
1552                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1553                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1554                 expect_reply = 1;
1555                 break;
1556
1557         case RANAL_MSG_PUT_ACK:
1558                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1559                 expect_reply = 1;
1560                 break;
1561
1562         case RANAL_MSG_GET_REQ:
1563                 rc = kranal_map_buffer(tx);
1564                 LASSERT (rc != -EAGAIN);
1565                 if (rc != 0)
1566                         break;
1567
1568                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1569                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1570                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1571                         (__u64)((unsigned long)tx->tx_buffer);
1572                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1573                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1574                 expect_reply = 1;
1575                 break;
1576         }
1577
1578         if (rc == -EAGAIN) {
1579                 /* I need credits to send this.  Replace tx at the head of the
1580                  * fmaq and I'll get rescheduled when credits appear */
1581                 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1582                 spin_lock_irqsave(&conn->rac_lock, flags);
1583                 list_add(&tx->tx_list, &conn->rac_fmaq);
1584                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1585                 return;
1586         }
1587
1588         if (!expect_reply || rc != 0) {
1589                 kranal_tx_done(tx, rc);
1590         } else {
1591                 /* LASSERT(current) above ensures this doesn't race with reply
1592                  * processing */
1593                 spin_lock_irqsave(&conn->rac_lock, flags);
1594                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1595                 tx->tx_qtime = jiffies;
1596                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1597         }
1598
1599         if (more_to_do) {
1600                 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1601                 kranal_schedule_conn(conn);
1602         }
1603 }
1604
1605 static inline void
1606 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1607 {
1608         __swab64s(&d->rard_key.Key);
1609         __swab16s(&d->rard_key.Cookie);
1610         __swab16s(&d->rard_key.MdHandle);
1611         __swab32s(&d->rard_key.Flags);
1612         __swab64s(&d->rard_addr.AddressBits);
1613         __swab32s(&d->rard_nob);
1614 }
1615
1616 kra_tx_t *
1617 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1618 {
1619         struct list_head *ttmp;
1620         kra_tx_t         *tx;
1621         unsigned long     flags;
1622
1623         spin_lock_irqsave(&conn->rac_lock, flags);
1624
1625         list_for_each(ttmp, &conn->rac_replyq) {
1626                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1627
1628                 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1629                        tx, tx->tx_msg.ram_type, tx->tx_cookie);
1630
1631                 if (tx->tx_cookie != cookie)
1632                         continue;
1633
1634                 if (tx->tx_msg.ram_type != type) {
1635                         spin_unlock_irqrestore(&conn->rac_lock, flags);
1636                         CWARN("Unexpected type %x (%x expected) "
1637                               "matched reply from "LPX64"\n",
1638                               tx->tx_msg.ram_type, type,
1639                               conn->rac_peer->rap_nid);
1640                         return NULL;
1641                 }
1642
1643                 list_del(&tx->tx_list);
1644                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1645                 return tx;
1646         }
1647
1648         spin_unlock_irqrestore(&conn->rac_lock, flags);
1649         CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
1650               type, cookie, conn->rac_peer->rap_nid);
1651         return NULL;
1652 }
1653
1654 void
1655 kranal_check_fma_rx (kra_conn_t *conn)
1656 {
1657         unsigned long flags;
1658         __u32         seq;
1659         kra_tx_t     *tx;
1660         kra_msg_t    *msg;
1661         void         *prefix;
1662         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1663         kra_peer_t   *peer = conn->rac_peer;
1664
1665         if (rrc == RAP_NOT_DONE)
1666                 return;
1667
1668         CDEBUG(D_NET, "RX on %p\n", conn);
1669
1670         LASSERT (rrc == RAP_SUCCESS);
1671         conn->rac_last_rx = jiffies;
1672         seq = conn->rac_rx_seq++;
1673         msg = (kra_msg_t *)prefix;
1674
1675         /* stash message for portals callbacks they'll NULL
1676          * rac_rxmsg if they consume it */
1677         LASSERT (conn->rac_rxmsg == NULL);
1678         conn->rac_rxmsg = msg;
1679
1680         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1681                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1682                         CERROR("Unexpected magic %08x from "LPX64"\n",
1683                                msg->ram_magic, peer->rap_nid);
1684                         goto out;
1685                 }
1686
1687                 __swab32s(&msg->ram_magic);
1688                 __swab16s(&msg->ram_version);
1689                 __swab16s(&msg->ram_type);
1690                 __swab64s(&msg->ram_srcnid);
1691                 __swab64s(&msg->ram_connstamp);
1692                 __swab32s(&msg->ram_seq);
1693
1694                 /* NB message type checked below; NOT here... */
1695                 switch (msg->ram_type) {
1696                 case RANAL_MSG_PUT_ACK:
1697                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1698                         break;
1699
1700                 case RANAL_MSG_GET_REQ:
1701                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1702                         break;
1703
1704                 default:
1705                         break;
1706                 }
1707         }
1708
1709         if (msg->ram_version != RANAL_MSG_VERSION) {
1710                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1711                        msg->ram_version, peer->rap_nid);
1712                 goto out;
1713         }
1714
1715         if (msg->ram_srcnid != peer->rap_nid) {
1716                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1717                        msg->ram_srcnid, peer->rap_nid);
1718                 goto out;
1719         }
1720
1721         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1722                 CERROR("Unexpected connstamp "LPX64"("LPX64
1723                        " expected) from "LPX64"\n",
1724                        msg->ram_connstamp, conn->rac_peer_connstamp,
1725                        peer->rap_nid);
1726                 goto out;
1727         }
1728
1729         if (msg->ram_seq != seq) {
1730                 CERROR("Unexpected sequence number %d(%d expected) from "
1731                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1732                 goto out;
1733         }
1734
1735         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1736                 /* This message signals RDMA completion... */
1737                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1738                 LASSERT (rrc == RAP_SUCCESS);
1739         }
1740
1741         if (conn->rac_close_recvd) {
1742                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1743                        msg->ram_type, conn->rac_peer->rap_nid);
1744                 goto out;
1745         }
1746
1747         if (msg->ram_type == RANAL_MSG_CLOSE) {
1748                 CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
1749                 conn->rac_close_recvd = 1;
1750                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1751
1752                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1753                         kranal_close_conn_locked(conn, 0);
1754                 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1755                          conn->rac_close_sent)
1756                         kranal_terminate_conn_locked(conn);
1757
1758                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1759                 goto out;
1760         }
1761
1762         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1763                 goto out;
1764
1765         switch (msg->ram_type) {
1766         case RANAL_MSG_NOOP:
1767                 /* Nothing to do; just a keepalive */
1768                 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1769                 break;
1770
1771         case RANAL_MSG_IMMEDIATE:
1772                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1773                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1774                 break;
1775
1776         case RANAL_MSG_PUT_REQ:
1777                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1778                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1779
1780                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1781                         break;
1782
1783                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1784                 if (tx == NULL)
1785                         break;
1786
1787                 tx->tx_msg.ram_u.completion.racm_cookie =
1788                         msg->ram_u.putreq.raprm_cookie;
1789                 kranal_post_fma(conn, tx);
1790                 break;
1791
1792         case RANAL_MSG_PUT_NAK:
1793                 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1794                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1795                                         msg->ram_u.completion.racm_cookie);
1796                 if (tx == NULL)
1797                         break;
1798
1799                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1800                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1801                 kranal_tx_done(tx, -ENOENT);    /* no match */
1802                 break;
1803
1804         case RANAL_MSG_PUT_ACK:
1805                 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1806                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1807                                         msg->ram_u.putack.rapam_src_cookie);
1808                 if (tx == NULL)
1809                         break;
1810
1811                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1812                             &msg->ram_u.putack.rapam_desc,
1813                             msg->ram_u.putack.rapam_desc.rard_nob,
1814                             msg->ram_u.putack.rapam_dst_cookie);
1815                 break;
1816
1817         case RANAL_MSG_PUT_DONE:
1818                 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1819                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1820                                         msg->ram_u.completion.racm_cookie);
1821                 if (tx == NULL)
1822                         break;
1823
1824                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1825                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1826                 kranal_tx_done(tx, 0);
1827                 break;
1828
1829         case RANAL_MSG_GET_REQ:
1830                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1831                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1832
1833                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1834                         break;
1835
1836                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1837                 if (tx == NULL)
1838                         break;
1839
1840                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1841                 kranal_post_fma(conn, tx);
1842                 break;
1843
1844         case RANAL_MSG_GET_NAK:
1845                 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1846                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1847                                         msg->ram_u.completion.racm_cookie);
1848                 if (tx == NULL)
1849                         break;
1850
1851                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1852                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1853                 kranal_tx_done(tx, -ENOENT);    /* no match */
1854                 break;
1855
1856         case RANAL_MSG_GET_DONE:
1857                 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1858                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1859                                         msg->ram_u.completion.racm_cookie);
1860                 if (tx == NULL)
1861                         break;
1862
1863                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1864                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1865                 kranal_tx_done(tx, 0);
1866                 break;
1867         }
1868
1869  out:
1870         if (conn->rac_rxmsg != NULL)
1871                 kranal_consume_rxmsg(conn, NULL, 0);
1872
1873         /* check again later */
1874         kranal_schedule_conn(conn);
1875 }
1876
1877 void
1878 kranal_complete_closed_conn (kra_conn_t *conn)
1879 {
1880         kra_tx_t   *tx;
1881         int         nfma;
1882         int         nreplies;
1883
1884         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1885         LASSERT (list_empty(&conn->rac_list));
1886         LASSERT (list_empty(&conn->rac_hashlist));
1887
1888         for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
1889                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1890
1891                 list_del(&tx->tx_list);
1892                 kranal_tx_done(tx, -ECONNABORTED);
1893         }
1894
1895         LASSERT (list_empty(&conn->rac_rdmaq));
1896
1897         for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
1898                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1899
1900                 list_del(&tx->tx_list);
1901                 kranal_tx_done(tx, -ECONNABORTED);
1902         }
1903
1904         CWARN("Closed conn %p -> "LPX64": nmsg %d nreplies %d\n",
1905               conn, conn->rac_peer->rap_nid, nfma, nreplies);
1906 }
1907
1908 int
1909 kranal_process_new_conn (kra_conn_t *conn)
1910 {
1911         RAP_RETURN   rrc;
1912         
1913         rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1914         if (rrc == RAP_SUCCESS)
1915                 return 0;
1916
1917         LASSERT (rrc == RAP_NOT_DONE);
1918         if (!time_after_eq(jiffies, conn->rac_last_tx + 
1919                            conn->rac_timeout * HZ))
1920                 return -EAGAIN;
1921
1922         /* Too late */
1923         rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1924         LASSERT (rrc == RAP_SUCCESS);
1925         return -ETIMEDOUT;
1926 }
1927
1928 int
1929 kranal_scheduler (void *arg)
1930 {
1931         kra_device_t     *dev = (kra_device_t *)arg;
1932         wait_queue_t      wait;
1933         char              name[16];
1934         kra_conn_t       *conn;
1935         unsigned long     flags;
1936         unsigned long     deadline;
1937         unsigned long     soonest;
1938         int               nsoonest;
1939         long              timeout;
1940         struct list_head *tmp;
1941         struct list_head *nxt;
1942         int               rc;
1943         int               dropped_lock;
1944         int               busy_loops = 0;
1945
1946         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1947         kportal_daemonize(name);
1948         kportal_blockallsigs();
1949
1950         dev->rad_scheduler = current;
1951         init_waitqueue_entry(&wait, current);
1952
1953         spin_lock_irqsave(&dev->rad_lock, flags);
1954
1955         while (!kranal_data.kra_shutdown) {
1956                 /* Safe: kra_shutdown only set when quiescent */
1957
1958                 if (busy_loops++ >= RANAL_RESCHED) {
1959                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1960
1961                         our_cond_resched();
1962                         busy_loops = 0;
1963
1964                         spin_lock_irqsave(&dev->rad_lock, flags);
1965                 }
1966
1967                 dropped_lock = 0;
1968
1969                 if (dev->rad_ready) {
1970                         /* Device callback fired since I last checked it */
1971                         dev->rad_ready = 0;
1972                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1973                         dropped_lock = 1;
1974
1975                         kranal_check_rdma_cq(dev);
1976                         kranal_check_fma_cq(dev);
1977
1978                         spin_lock_irqsave(&dev->rad_lock, flags);
1979                 }
1980
1981                 list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1982                         conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1983
1984                         list_del_init(&conn->rac_schedlist);
1985                         LASSERT (conn->rac_scheduled);
1986                         conn->rac_scheduled = 0;
1987                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1988                         dropped_lock = 1;
1989
1990                         kranal_check_fma_rx(conn);
1991                         kranal_process_fmaq(conn);
1992
1993                         if (conn->rac_state == RANAL_CONN_CLOSED)
1994                                 kranal_complete_closed_conn(conn);
1995
1996                         kranal_conn_decref(conn);
1997                         spin_lock_irqsave(&dev->rad_lock, flags);
1998                 }
1999
2000                 nsoonest = 0;
2001                 soonest = jiffies;
2002
2003                 list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2004                         conn = list_entry(tmp, kra_conn_t, rac_schedlist);
2005                         
2006                         deadline = conn->rac_last_tx + conn->rac_keepalive;
2007                         if (time_after_eq(jiffies, deadline)) {
2008                                 /* Time to process this new conn */
2009                                 spin_unlock_irqrestore(&dev->rad_lock, flags);
2010                                 dropped_lock = 1;
2011
2012                                 rc = kranal_process_new_conn(conn);
2013                                 if (rc != -EAGAIN) {
2014                                         /* All done with this conn */
2015                                         spin_lock_irqsave(&dev->rad_lock, flags);
2016                                         list_del_init(&conn->rac_schedlist);
2017                                         spin_unlock_irqrestore(&dev->rad_lock, flags);
2018
2019                                         kranal_conn_decref(conn);
2020                                         spin_lock_irqsave(&dev->rad_lock, flags);
2021                                         continue;
2022                                 }
2023
2024                                 /* retry with exponential backoff until HZ */
2025                                 if (conn->rac_keepalive == 0)
2026                                         conn->rac_keepalive = 1;
2027                                 else if (conn->rac_keepalive <= HZ)
2028                                         conn->rac_keepalive *= 2;
2029                                 else
2030                                         conn->rac_keepalive += HZ;
2031                                 
2032                                 deadline = conn->rac_last_tx + conn->rac_keepalive;
2033                                 spin_lock_irqsave(&dev->rad_lock, flags);
2034                         }
2035
2036                         /* Does this conn need attention soonest? */
2037                         if (nsoonest++ == 0 ||
2038                             !time_after_eq(deadline, soonest))
2039                                 soonest = deadline;
2040                 }
2041
2042                 if (dropped_lock)               /* may sleep iff I didn't drop the lock */
2043                         continue;
2044
2045                 set_current_state(TASK_INTERRUPTIBLE);
2046                 add_wait_queue(&dev->rad_waitq, &wait);
2047                 spin_unlock_irqrestore(&dev->rad_lock, flags);
2048
2049                 if (nsoonest == 0) {
2050                         busy_loops = 0;
2051                         schedule();
2052                 } else {
2053                         timeout = (long)(soonest - jiffies);
2054                         if (timeout > 0) {
2055                                 busy_loops = 0;
2056                                 schedule_timeout(timeout);
2057                         }
2058                 }
2059
2060                 remove_wait_queue(&dev->rad_waitq, &wait);
2061                 set_current_state(TASK_RUNNING);
2062                 spin_lock_irqsave(&dev->rad_lock, flags);
2063         }
2064
2065         spin_unlock_irqrestore(&dev->rad_lock, flags);
2066
2067         dev->rad_scheduler = NULL;
2068         kranal_thread_fini();
2069         return 0;
2070 }
2071
2072
2073 lib_nal_t kranal_lib = {
2074         libnal_data:        &kranal_data,      /* NAL private data */
2075         libnal_send:         kranal_send,
2076         libnal_send_pages:   kranal_send_pages,
2077         libnal_recv:         kranal_recv,
2078         libnal_recv_pages:   kranal_recv_pages,
2079         libnal_dist:         kranal_dist
2080 };