Whamcloud - gitweb
* Updated ranal prior to the Great Schism
[fs/lustre-release.git] / lustre / portals / knals / ranal / ranal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46         
47         for (i = 0; i < kranal_data.kra_ndevs; i++) {
48
49                 dev = &kranal_data.kra_devices[i];
50                 if (dev->rad_id != devid)
51                         continue;
52
53                 spin_lock_irqsave(&dev->rad_lock, flags);
54
55                 if (!dev->rad_ready) {
56                         dev->rad_ready = 1;
57                         wake_up(&dev->rad_waitq);
58                 }
59
60                 spin_unlock_irqrestore(&dev->rad_lock, flags);
61                 return;
62         }
63         
64         CWARN("callback for unknown device %d\n", devid);
65 }
66
67 void
68 kranal_schedule_conn(kra_conn_t *conn)
69 {
70         kra_device_t    *dev = conn->rac_device;
71         unsigned long    flags;
72         
73         spin_lock_irqsave(&dev->rad_lock, flags);
74         
75         if (!conn->rac_scheduled) {
76                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
77                 conn->rac_scheduled = 1;
78                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79                 wake_up(&dev->rad_waitq);
80         }
81
82         spin_unlock_irqrestore(&dev->rad_lock, flags);
83 }
84
85 void
86 kranal_schedule_cqid (__u32 cqid)
87 {
88         kra_conn_t         *conn;
89         struct list_head   *conns;
90         struct list_head   *tmp;
91
92         conns = kranal_cqid2connlist(cqid);
93
94         read_lock(&kranal_data.kra_global_lock);
95
96         conn = kranal_cqid2conn_locked(cqid);
97         
98         if (conn == NULL)
99                 CWARN("no cqid %x\n", cqid);
100         else
101                 kranal_schedule_conn(conn);
102         
103         read_unlock(&kranal_data.kra_global_lock);
104 }
105
106 void
107 kranal_schedule_dev(kra_device_t *dev)
108 {
109         kra_conn_t         *conn;
110         struct list_head   *conns;
111         struct list_head   *tmp;
112         int                 i;
113
114         /* Don't do this in IRQ context (servers may have 1000s of clients) */
115         LASSERT (!in_interrupt()); 
116
117         CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
118
119         for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
120
121                 /* Drop the lock on each hash bucket to ensure we don't
122                  * block anyone for too long at IRQ priority on another CPU */
123                 
124                 read_lock(&kranal_data.kra_global_lock);
125         
126                 conns = &kranal_data.kra_conns[i];
127
128                 list_for_each (tmp, conns) {
129                         conn = list_entry(tmp, kra_conn_t, rac_hashlist);
130                 
131                         if (conn->rac_device == dev)
132                                 kranal_schedule_conn(conn);
133                 }
134                 read_unlock(&kranal_data.kra_global_lock);
135         }
136 }
137
138 void
139 kranal_tx_done (kra_tx_t *tx, int completion)
140 {
141         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
142         kra_device_t    *dev;
143         unsigned long    flags;
144         int              i;
145         RAP_RETURN       rrc;
146
147         LASSERT (!in_interrupt());
148
149         switch (tx->tx_buftype) {
150         default:
151                 LBUG();
152
153         case RANAL_BUF_NONE:
154         case RANAL_BUF_IMMEDIATE:
155         case RANAL_BUF_PHYS_UNMAPPED:
156         case RANAL_BUF_VIRT_UNMAPPED:
157                 break;
158
159         case RANAL_BUF_PHYS_MAPPED:
160                 LASSERT (tx->tx_conn != NULL);
161                 dev = tx->tx_conn->rac_device;
162                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
163                                            dev->rad_ptag, &tx->tx_map_key);
164                 LASSERT (rrc == RAP_SUCCESS);
165                 break;
166
167         case RANAL_BUF_VIRT_MAPPED:
168                 LASSERT (tx->tx_conn != NULL);
169                 dev = tx->tx_conn->rac_device;
170                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
171                                            dev->rad_ptag, &tx->tx_map_key);
172                 LASSERT (rrc == RAP_SUCCESS);
173                 break;
174         }
175
176         for (i = 0; i < 2; i++) {
177                 /* tx may have up to 2 libmsgs to finalise */
178                 if (tx->tx_libmsg[i] == NULL)
179                         continue;
180
181                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
182                 tx->tx_libmsg[i] = NULL;
183         }
184
185         tx->tx_buftype = RANAL_BUF_NONE;
186         tx->tx_msg.ram_type = RANAL_MSG_NONE;
187         tx->tx_conn = NULL;
188
189         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
190
191         if (tx->tx_isnblk) {
192                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
193         } else {
194                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
195                 wake_up(&kranal_data.kra_idle_tx_waitq);
196         }
197
198         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
199 }
200
201 kra_tx_t *
202 kranal_get_idle_tx (int may_block) 
203 {
204         unsigned long  flags;
205         kra_tx_t      *tx = NULL;
206         
207         for (;;) {
208                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
209
210                 /* "normal" descriptor is free */
211                 if (!list_empty(&kranal_data.kra_idle_txs)) {
212                         tx = list_entry(kranal_data.kra_idle_txs.next,
213                                         kra_tx_t, tx_list);
214                         break;
215                 }
216
217                 if (!may_block) {
218                         /* may dip into reserve pool */
219                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
220                                 CERROR("reserved tx desc pool exhausted\n");
221                                 break;
222                         }
223
224                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
225                                         kra_tx_t, tx_list);
226                         break;
227                 }
228
229                 /* block for idle tx */
230                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
231
232                 wait_event(kranal_data.kra_idle_tx_waitq,
233                            !list_empty(&kranal_data.kra_idle_txs));
234         }
235
236         if (tx != NULL) {
237                 list_del(&tx->tx_list);
238
239                 /* Allocate a new completion cookie.  It might not be
240                  * needed, but we've got a lock right now... */
241                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
242
243                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
244                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
245                 LASSERT (tx->tx_conn == NULL);
246                 LASSERT (tx->tx_libmsg[0] == NULL);
247                 LASSERT (tx->tx_libmsg[1] == NULL);
248         }
249
250         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
251         
252         return tx;
253 }
254
255 void
256 kranal_init_msg(kra_msg_t *msg, int type)
257 {
258         msg->ram_magic = RANAL_MSG_MAGIC;
259         msg->ram_version = RANAL_MSG_VERSION;
260         msg->ram_type = type;
261         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
262         /* ram_incarnation gets set when FMA is sent */
263 }
264
265 kra_tx_t *
266 kranal_new_tx_msg (int may_block, int type)
267 {
268         kra_tx_t *tx = kranal_get_idle_tx(may_block);
269
270         if (tx == NULL)
271                 return NULL;
272
273         kranal_init_msg(&tx->tx_msg, type);
274         return tx;
275 }
276
277 int
278 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
279                                int offset, int nob)
280                  
281 {
282         LASSERT (nob > 0);
283         LASSERT (niov > 0);
284         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
285
286         while (offset >= iov->iov_len) {
287                 offset -= iov->iov_len;
288                 niov--;
289                 iov++;
290                 LASSERT (niov > 0);
291         }
292
293         if (nob > iov->iov_len - offset) {
294                 CERROR("Can't handle multiple vaddr fragments\n");
295                 return -EMSGSIZE;
296         }
297
298         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
299         tx->tx_nob = nob;
300         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
301         return 0;
302 }
303
304 int
305 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
306                           int offset, int nob)
307                  
308 {
309         LASSERT (nob > 0);
310         LASSERT (niov > 0);
311         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
312
313         while (offset >= iov->iov_len) {
314                 offset -= iov->iov_len;
315                 niov--;
316                 iov++;
317                 LASSERT (niov > 0);
318         }
319
320         if (nob > iov->iov_len - offset) {
321                 CERROR("Can't handle multiple vaddr fragments\n");
322                 return -EMSGSIZE;
323         }
324
325         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
326         tx->tx_nob = nob;
327         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
328         return 0;
329 }
330
331 int
332 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
333                           int offset, int nob)
334 {
335         RAP_PHYS_REGION *phys = tx->tx_phys;
336         int              resid;
337
338         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
339
340         LASSERT (nob > 0);
341         LASSERT (nkiov > 0);
342         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
343
344         while (offset >= kiov->kiov_len) {
345                 offset -= kiov->kiov_len;
346                 nkiov--;
347                 kiov++;
348                 LASSERT (nkiov > 0);
349         }
350
351         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
352         tx->tx_nob = nob;
353         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
354         
355         phys->Address = kranal_page2phys(kiov->kiov_page);
356         phys->Length  = PAGE_SIZE;
357         phys++;
358
359         resid = nob - (kiov->kiov_len - offset);
360         while (resid > 0) {
361                 kiov++;
362                 nkiov--;
363                 LASSERT (nkiov > 0);
364
365                 if (kiov->kiov_offset != 0 ||
366                     ((resid > PAGE_SIZE) && 
367                      kiov->kiov_len < PAGE_SIZE)) {
368                         int i;
369                         /* Can't have gaps */
370                         CERROR("Can't make payload contiguous in I/O VM:"
371                                "page %d, offset %d, len %d \n", 
372                                phys - tx->tx_phys, 
373                                kiov->kiov_offset, kiov->kiov_len);                        
374                         return -EINVAL;
375                 }
376
377                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
378                         CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
379                         return -EMSGSIZE;
380                 }
381
382                 phys->Address = kranal_page2phys(kiov->kiov_page);
383                 phys->Length  = PAGE_SIZE;
384                 phys++;
385
386                 resid -= PAGE_SIZE;
387         }
388
389         tx->tx_phys_npages = phys - tx->tx_phys;
390         return 0;
391 }
392
393 static inline int
394 kranal_setup_buffer (kra_tx_t *tx, int niov, 
395                      struct iovec *iov, ptl_kiov_t *kiov,
396                      int offset, int nob)
397 {
398         LASSERT ((iov == NULL) != (kiov == NULL));
399         
400         if (kiov != NULL)
401                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
402         
403         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
404 }
405
406 void
407 kranal_map_buffer (kra_tx_t *tx)
408 {
409         kra_conn_t     *conn = tx->tx_conn;
410         kra_device_t   *dev = conn->rac_device;
411         RAP_RETURN      rrc;
412
413         switch (tx->tx_buftype) {
414         default:
415                 
416         case RANAL_BUF_PHYS_UNMAPPED:
417                 rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
418                                        tx->tx_phys, tx->tx_phys_npages,
419                                        conn->rac_device->rad_ptag,
420                                        &tx->tx_map_key);
421                 LASSERT (rrc == RAP_SUCCESS);
422                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
423                 return;
424
425         case RANAL_BUF_VIRT_UNMAPPED:
426                 rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
427                                          tx->tx_buffer, tx->tx_nob,
428                                          conn->rac_device->rad_ptag,
429                                          &tx->tx_map_key);
430                 LASSERT (rrc == RAP_SUCCESS);
431                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
432                 return;
433         }
434 }
435
436 kra_conn_t *
437 kranal_find_conn_locked (kra_peer_t *peer)
438 {
439         struct list_head *tmp;
440
441         /* just return the first connection */
442         list_for_each (tmp, &peer->rap_conns) {
443                 return list_entry(tmp, kra_conn_t, rac_list);
444         }
445
446         return NULL;
447 }
448
449 void
450 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
451 {
452         unsigned long    flags;
453
454         tx->tx_conn = conn;
455
456         spin_lock_irqsave(&conn->rac_lock, flags);
457         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
458         tx->tx_qtime = jiffies;
459         spin_unlock_irqrestore(&conn->rac_lock, flags);
460
461         kranal_schedule_conn(conn);
462 }
463
464 void
465 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
466 {
467         unsigned long    flags;
468         kra_peer_t      *peer;
469         kra_conn_t      *conn;
470         unsigned long    now;
471         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
472
473         /* If I get here, I've committed to send, so I complete the tx with
474          * failure on any problems */
475         
476         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
477
478         read_lock(g_lock);
479         
480         peer = kranal_find_peer_locked(nid);
481         if (peer == NULL) {
482                 read_unlock(g_lock);
483                 kranal_tx_done(tx, -EHOSTUNREACH);
484                 return;
485         }
486
487         conn = kranal_find_conn_locked(peer);
488         if (conn != NULL) {
489                 kranal_post_fma(conn, tx);
490                 read_unlock(g_lock);
491                 return;
492         }
493         
494         /* Making one or more connections; I'll need a write lock... */
495         read_unlock(g_lock);
496         write_lock_irqsave(g_lock, flags);
497
498         peer = kranal_find_peer_locked(nid);
499         if (peer == NULL) {
500                 write_unlock_irqrestore(g_lock, flags);
501                 kranal_tx_done(tx, -EHOSTUNREACH);
502                 return;
503         }
504
505         conn = kranal_find_conn_locked(peer);
506         if (conn != NULL) {
507                 /* Connection exists; queue message on it */
508                 kranal_post_fma(conn, tx);
509                 write_unlock_irqrestore(g_lock, flags);
510                 return;
511         }
512
513         LASSERT (peer->rap_persistence > 0);
514
515         if (!peer->rap_connecting) {
516                 now = CURRENT_TIME;
517                 if (now < peer->rap_reconnect_time) {
518                         write_unlock_irqrestore(g_lock, flags);
519                         kranal_tx_done(tx, -EHOSTUNREACH);
520                         return;
521                 }
522         
523                 peer->rap_connecting = 1;
524                 kranal_peer_addref(peer); /* extra ref for connd */
525         
526                 spin_lock(&kranal_data.kra_connd_lock);
527         
528                 list_add_tail(&peer->rap_connd_list,
529                               &kranal_data.kra_connd_peers);
530                 wake_up(&kranal_data.kra_connd_waitq);
531         
532                 spin_unlock(&kranal_data.kra_connd_lock);
533         }
534         
535         /* A connection is being established; queue the message... */
536         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
537
538         write_unlock_irqrestore(g_lock, flags);
539 }
540
541 static void
542 kranal_rdma(kra_tx_t *tx, int type, 
543             kra_rdma_desc_t *rard, int nob, __u64 cookie)
544 {
545         kra_conn_t   *conn = tx->tx_conn;
546         RAP_RETURN    rrc;
547         unsigned long flags;
548
549         /* prep final completion message */
550         kranal_init_msg(&tx->tx_msg, type);
551         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
552         
553         LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
554                  tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
555         LASSERT (nob <= rard->rard_nob);
556
557         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
558         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
559         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
560         tx->tx_rdma_desc.DstPtr = rard->rard_addr;
561         tx->tx_rdma_desc.DstKey = rard->rard_key;
562         tx->tx_rdma_desc.Length = nob;
563         tx->tx_rdma_desc.AppPtr = tx;
564
565         if (nob == 0) { /* Immediate completion */
566                 kranal_post_fma(conn, tx);
567                 return;
568         }
569         
570         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
571         LASSERT (rrc == RAP_SUCCESS);
572
573         spin_lock_irqsave(&conn->rac_lock, flags);
574         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
575         tx->tx_qtime = jiffies;
576         spin_unlock_irqrestore(&conn->rac_lock, flags);
577 }
578
579 int
580 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
581 {
582         __u32      nob_received = nob;
583         RAP_RETURN rrc;
584
585         LASSERT (conn->rac_rxmsg != NULL);
586
587         rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
588                                 &nob_received, sizeof(kra_msg_t));
589         LASSERT (rrc == RAP_SUCCESS);
590
591         conn->rac_rxmsg = NULL;
592
593         if (nob_received != nob) {
594                 CWARN("Expected %d immediate bytes but got %d\n",
595                       nob, nob_received);
596                 return -EPROTO;
597         }
598         
599         return 0;
600 }
601
602 ptl_err_t
603 kranal_do_send (lib_nal_t    *nal, 
604                 void         *private,
605                 lib_msg_t    *libmsg,
606                 ptl_hdr_t    *hdr, 
607                 int           type, 
608                 ptl_nid_t     nid, 
609                 ptl_pid_t     pid,
610                 unsigned int  niov, 
611                 struct iovec *iov, 
612                 ptl_kiov_t   *kiov,
613                 size_t        offset,
614                 size_t        nob)
615 {
616         kra_conn_t *conn;
617         kra_tx_t   *tx;
618         int         rc;
619
620         /* NB 'private' is different depending on what we're sending.... */
621
622         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
623                " pid %d\n", nob, niov, nid , pid);
624
625         LASSERT (nob == 0 || niov > 0);
626         LASSERT (niov <= PTL_MD_MAX_IOV);
627
628         LASSERT (!in_interrupt());
629         /* payload is either all vaddrs or all pages */
630         LASSERT (!(kiov != NULL && iov != NULL));
631
632         switch(type) {
633         default:
634                 LBUG();
635                 
636         case PTL_MSG_REPLY: {
637                 /* reply's 'private' is the conn that received the GET_REQ */
638                 conn = private;
639                 LASSERT (conn->rac_rxmsg != NULL);
640
641                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
642                         if (nob > RANAL_MAX_IMMEDIATE) {
643                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
644                                        nob, nid);
645                                 return PTL_FAIL;
646                         }
647                         break;                  /* RDMA not expected */
648                 }
649                 
650                 /* Incoming message consistent with immediate reply? */
651                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
652                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
653                                nid, conn->rac_rxmsg->ram_type);
654                         return PTL_FAIL;
655                 }
656
657                 tx = kranal_get_idle_tx(0);
658                 if (tx == NULL)
659                         return PTL_FAIL;
660
661                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
662                 if (rc != 0) {
663                         kranal_tx_done(tx, rc);
664                         return PTL_FAIL;
665                 }
666
667                 tx->tx_conn = conn;
668                 tx->tx_libmsg[0] = libmsg;
669
670                 kranal_map_buffer(tx);
671                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
672                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
673                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
674                 return PTL_OK;
675         }
676
677         case PTL_MSG_GET:
678                 if (kiov == NULL &&             /* not paged */
679                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
680                     nob <= kranal_tunables.kra_max_immediate)
681                         break;                  /* send IMMEDIATE */
682
683                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
684                 if (tx == NULL)
685                         return PTL_NO_SPACE;
686
687                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
688                 if (rc != 0) {
689                         kranal_tx_done(tx, rc);
690                         return PTL_FAIL;
691                 }
692
693                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
694                 if (tx->tx_libmsg[1] == NULL) {
695                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
696                         kranal_tx_done(tx, rc);
697                         return PTL_FAIL;
698                 }
699
700                 tx->tx_libmsg[0] = libmsg;
701                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
702                 /* rest of tx_msg is setup just before it is sent */
703                 kranal_launch_tx(tx, nid);
704                 return PTL_OK;
705
706         case PTL_MSG_ACK:
707                 LASSERT (nob == 0);
708                 break;
709
710         case PTL_MSG_PUT:
711                 if (kiov == NULL &&             /* not paged */
712                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
713                     nob <= kranal_tunables.kra_max_immediate)
714                         break;                  /* send IMMEDIATE */
715                 
716                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
717                 if (tx == NULL)
718                         return PTL_NO_SPACE;
719
720                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
721                 if (rc != 0) {
722                         kranal_tx_done(tx, rc);
723                         return PTL_FAIL;
724                 }
725
726                 tx->tx_libmsg[0] = libmsg;
727                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
728                 /* rest of tx_msg is setup just before it is sent */
729                 kranal_launch_tx(tx, nid);
730                 return PTL_OK;
731         }
732
733         LASSERT (kiov == NULL);
734         LASSERT (nob <= RANAL_MAX_IMMEDIATE);
735
736         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
737                                  type == PTL_MSG_REPLY ||
738                                  in_interrupt()), 
739                                RANAL_MSG_IMMEDIATE);
740         if (tx == NULL)
741                 return PTL_NO_SPACE;
742
743         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
744         if (rc != 0) {
745                 kranal_tx_done(tx, rc);
746                 return PTL_FAIL;
747         }
748                 
749         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
750         tx->tx_libmsg[0] = libmsg;
751         kranal_launch_tx(tx, nid);
752         return PTL_OK;
753 }
754
755 ptl_err_t
756 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
757              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
758              unsigned int niov, struct iovec *iov,
759              size_t offset, size_t len)
760 {
761         return kranal_do_send(nal, private, cookie,
762                               hdr, type, nid, pid,
763                               niov, iov, NULL,
764                               offset, len);
765 }
766
767 ptl_err_t
768 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
769                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
770                    unsigned int niov, ptl_kiov_t *kiov, 
771                    size_t offset, size_t len)
772 {
773         return kranal_do_send(nal, private, cookie,
774                               hdr, type, nid, pid,
775                               niov, NULL, kiov,
776                               offset, len);
777 }
778
779 ptl_err_t
780 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
781                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
782                 size_t offset, size_t mlen, size_t rlen)
783 {
784         kra_conn_t  *conn = private;
785         kra_msg_t   *rxmsg = conn->rac_rxmsg;
786         kra_tx_t    *tx;
787         void        *buffer;
788         int          rc;
789         
790         LASSERT (mlen <= rlen);
791         LASSERT (!in_interrupt());
792         /* Either all pages or all vaddrs */
793         LASSERT (!(kiov != NULL && iov != NULL));
794
795         switch(rxmsg->ram_type) {
796         default:
797                 LBUG();
798                 return PTL_FAIL;
799                 
800         case RANAL_MSG_IMMEDIATE:
801                 if (mlen == 0) {
802                         buffer = NULL;
803                 } else if (kiov != NULL) {
804                         CERROR("Can't recv immediate into paged buffer\n");
805                         return PTL_FAIL;
806                 } else {
807                         LASSERT (niov > 0);
808                         while (offset >= iov->iov_len) {
809                                 offset -= iov->iov_len;
810                                 iov++;
811                                 niov--;
812                                 LASSERT (niov > 0);
813                         }
814                         if (mlen > iov->iov_len - offset) {
815                                 CERROR("Can't handle immediate frags\n");
816                                 return PTL_FAIL;
817                         }
818                         buffer = ((char *)iov->iov_base) + offset;
819                 }
820                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
821                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
822                 return PTL_OK;
823
824         case RANAL_MSG_GET_REQ:
825                 /* If the GET matched, we've already handled it in
826                  * kranal_do_send which is called to send the REPLY.  We're
827                  * only called here to complete the GET receive (if we needed
828                  * it which we don't, but I digress...) */
829                 LASSERT (libmsg == NULL);
830                 lib_finalize(nal, NULL, libmsg, PTL_OK);
831                 return PTL_OK;
832
833         case RANAL_MSG_PUT_REQ:
834                 if (libmsg == NULL) {           /* PUT didn't match... */
835                         lib_finalize(nal, NULL, libmsg, PTL_OK);
836                         return PTL_OK;
837                 }
838                 
839                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
840                 if (tx == NULL)
841                         return PTL_NO_SPACE;
842
843                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
844                 if (rc != 0) {
845                         kranal_tx_done(tx, rc);
846                         return PTL_FAIL;
847                 }
848
849                 kranal_map_buffer(tx);
850                 
851                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
852                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
853                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
854                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
855                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
856                         (__u64)((unsigned long)tx->tx_buffer);
857                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
858
859                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
860
861                 kranal_post_fma(conn, tx);
862                 
863                 /* flag matched by consuming rx message */
864                 kranal_consume_rxmsg(conn, NULL, 0);
865                 return PTL_OK;
866         }
867 }
868
869 ptl_err_t
870 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
871              unsigned int niov, struct iovec *iov, 
872              size_t offset, size_t mlen, size_t rlen)
873 {
874         return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
875                               offset, mlen, rlen);
876 }
877
878 ptl_err_t
879 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
880                    unsigned int niov, ptl_kiov_t *kiov, 
881                    size_t offset, size_t mlen, size_t rlen)
882 {
883         return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
884                               offset, mlen, rlen);
885 }
886
887 int
888 kranal_thread_start (int(*fn)(void *arg), void *arg)
889 {
890         long    pid = kernel_thread(fn, arg, 0);
891
892         if (pid < 0)
893                 return(int)pid;
894
895         atomic_inc(&kranal_data.kra_nthreads);
896         return 0;
897 }
898
899 void
900 kranal_thread_fini (void)
901 {
902         atomic_dec(&kranal_data.kra_nthreads);
903 }
904
905 int
906 kranal_check_conn (kra_conn_t *conn)
907 {
908         kra_tx_t          *tx;
909         struct list_head  *ttmp;
910         unsigned long      flags;
911         long               timeout;
912         unsigned long      now = jiffies;
913
914         if (!conn->rac_closing &&
915             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
916                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
917                 kranal_schedule_conn(conn);
918         }
919
920         /* wait twice as long for CLOSE to be sure peer is dead */
921         timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
922
923         if (!conn->rac_close_recvd &&
924             time_after_eq(now, conn->rac_last_rx + timeout)) {
925                 CERROR("Nothing received from "LPX64" within %lu seconds\n",
926                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
927                 return -ETIMEDOUT;
928         }
929
930         if (conn->rac_closing)
931                 return 0;
932         
933         /* Check the conn's queues are moving.  These are "belt+braces" checks,
934          * in case of hardware/software errors that make this conn seem
935          * responsive even though it isn't progressing its message queues. */
936
937         spin_lock_irqsave(&conn->rac_lock, flags);
938
939         list_for_each (ttmp, &conn->rac_fmaq) {
940                 tx = list_entry(ttmp, kra_tx_t, tx_list);
941                 
942                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
943                         spin_unlock_irqrestore(&conn->rac_lock, flags);
944                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
945                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
946                         return -ETIMEDOUT;
947                 }
948         }
949         
950         list_for_each (ttmp, &conn->rac_rdmaq) {
951                 tx = list_entry(ttmp, kra_tx_t, tx_list);
952                 
953                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
954                         spin_unlock_irqrestore(&conn->rac_lock, flags);
955                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
956                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
957                         return -ETIMEDOUT;
958                 }
959         }
960         
961         list_for_each (ttmp, &conn->rac_replyq) {
962                 tx = list_entry(ttmp, kra_tx_t, tx_list);
963                 
964                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
965                         spin_unlock_irqrestore(&conn->rac_lock, flags);
966                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
967                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
968                         return -ETIMEDOUT;
969                 }
970         }
971         
972         spin_unlock_irqrestore(&conn->rac_lock, flags);
973         return 0;
974 }
975
976 void
977 kranal_check_conns (int idx, unsigned long *min_timeoutp)
978 {
979         struct list_head  *conns = &kranal_data.kra_conns[idx];
980         struct list_head  *ctmp;
981         kra_conn_t        *conn;
982         unsigned long      flags;
983         int                rc;
984
985  again:
986         /* NB. We expect to check all the conns and not find any problems, so
987          * we just use a shared lock while we take a look... */
988         read_lock(&kranal_data.kra_global_lock);
989
990         list_for_each (ctmp, conns) {
991                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
992
993                 if (conn->rac_timeout < *min_timeoutp )
994                         *min_timeoutp = conn->rac_timeout;
995                 if (conn->rac_keepalive < *min_timeoutp )
996                         *min_timeoutp = conn->rac_keepalive;
997
998                 rc = kranal_check_conn(conn);
999                 if (rc == 0)
1000                         continue;
1001
1002                 kranal_conn_addref(conn);
1003                 read_unlock(&kranal_data.kra_global_lock);
1004
1005                 CERROR("Check on conn to "LPX64"failed: %d\n",
1006                        conn->rac_peer->rap_nid, rc);
1007
1008                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1009
1010                 if (!conn->rac_closing)
1011                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1012                 else
1013                         kranal_terminate_conn_locked(conn);
1014                         
1015                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1016
1017                 kranal_conn_decref(conn);
1018
1019                 /* start again now I've dropped the lock */
1020                 goto again;
1021         }
1022
1023         read_unlock(&kranal_data.kra_global_lock);
1024 }
1025
1026 int
1027 kranal_connd (void *arg)
1028 {
1029         char               name[16];
1030         wait_queue_t       wait;
1031         unsigned long      flags;
1032         kra_peer_t        *peer;
1033         int                i;
1034
1035         snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1036         kportal_daemonize(name);
1037         kportal_blockallsigs();
1038
1039         init_waitqueue_entry(&wait, current);
1040
1041         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1042
1043         while (!kranal_data.kra_shutdown) {
1044                 /* Safe: kra_shutdown only set when quiescent */
1045
1046                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1047                         peer = list_entry(kranal_data.kra_connd_peers.next,
1048                                           kra_peer_t, rap_connd_list);
1049                         
1050                         list_del_init(&peer->rap_connd_list);
1051                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1052
1053                         kranal_connect(peer);
1054                         kranal_peer_decref(peer);
1055
1056                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1057                         continue;
1058                 }
1059
1060                 set_current_state(TASK_INTERRUPTIBLE);
1061                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1062                 
1063                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1064
1065                 schedule ();
1066                 
1067                 set_current_state(TASK_RUNNING);
1068                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1069
1070                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1071         }
1072
1073         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1074
1075         kranal_thread_fini();
1076         return 0;
1077 }
1078
1079 void
1080 kranal_update_reaper_timeout(long timeout) 
1081 {
1082         unsigned long   flags;
1083
1084         LASSERT (timeout > 0);
1085         
1086         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1087         
1088         if (timeout < kranal_data.kra_new_min_timeout)
1089                 kranal_data.kra_new_min_timeout = timeout;
1090
1091         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1092 }
1093
1094 int
1095 kranal_reaper (void *arg)
1096 {
1097         wait_queue_t       wait;
1098         unsigned long      flags;
1099         kra_conn_t        *conn;
1100         kra_peer_t        *peer;
1101         long               timeout;
1102         int                i;
1103         int                conn_entries = kranal_data.kra_conn_hash_size;
1104         int                conn_index = 0;
1105         int                base_index = conn_entries - 1;
1106         unsigned long      next_check_time = jiffies;
1107         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1108         long               current_min_timeout = 1;
1109         
1110         kportal_daemonize("kranal_reaper");
1111         kportal_blockallsigs();
1112
1113         init_waitqueue_entry(&wait, current);
1114
1115         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1116         kranal_data.kra_new_min_timeout = 1;
1117
1118         while (!kranal_data.kra_shutdown) {
1119
1120                 /* careful with the jiffy wrap... */
1121                 timeout = (long)(next_check_time - jiffies);
1122                 if (timeout <= 0) {
1123                 
1124                         /* I wake up every 'p' seconds to check for
1125                          * timeouts on some more peers.  I try to check
1126                          * every connection 'n' times within the global
1127                          * minimum of all keepalive and timeout intervals,
1128                          * to ensure I attend to every connection within
1129                          * (n+1)/n times its timeout intervals. */
1130                 
1131                         const int     p = 1;
1132                         const int     n = 3;
1133                         unsigned long min_timeout;
1134                         int           chunk;
1135
1136                         if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1137                                 /* new min timeout set: restart min timeout scan */
1138                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1139                                 base_index = conn_index - 1;
1140                                 if (base_index < 0)
1141                                         base_index = conn_entries - 1;
1142
1143                                 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1144                                         current_min_timeout = kranal_data.kra_new_min_timeout;
1145                                         CWARN("Set new min timeout %ld\n",
1146                                               current_min_timeout);
1147                                 }
1148
1149                                 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1150                         }
1151                         min_timeout = current_min_timeout;
1152
1153                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1154                                                flags);
1155
1156                         LASSERT (min_timeout > 0);
1157
1158                         /* Compute how many table entries to check now so I
1159                          * get round the whole table fast enough (NB I do
1160                          * this at fixed intervals of 'p' seconds) */
1161                         chunk = conn_entries;
1162                         if (min_timeout > n * p)
1163                                 chunk = (chunk * n * p) / min_timeout;
1164                         if (chunk == 0)
1165                                 chunk = 1;
1166
1167                         for (i = 0; i < chunk; i++) {
1168                                 kranal_check_conns(conn_index, 
1169                                                    &next_min_timeout);
1170                                 conn_index = (conn_index + 1) % conn_entries;
1171                         }
1172
1173                         next_check_time += p * HZ;
1174
1175                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1176
1177                         if (((conn_index - chunk <= base_index &&
1178                               base_index < conn_index) ||
1179                              (conn_index - conn_entries - chunk <= base_index &&
1180                               base_index < conn_index - conn_entries))) {
1181
1182                                 /* Scanned all conns: set current_min_timeout... */
1183                                 if (current_min_timeout != next_min_timeout) {
1184                                         current_min_timeout = next_min_timeout;                                        
1185                                         CWARN("Set new min timeout %ld\n",
1186                                               current_min_timeout);
1187                                 }
1188
1189                                 /* ...and restart min timeout scan */
1190                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1191                                 base_index = conn_index - 1;
1192                                 if (base_index < 0)
1193                                         base_index = conn_entries - 1;
1194                         }
1195                 }
1196
1197                 set_current_state(TASK_INTERRUPTIBLE);
1198                 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1199
1200                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1201
1202                 schedule_timeout(timeout);
1203
1204                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1205
1206                 set_current_state(TASK_RUNNING);
1207                 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1208         }
1209
1210         kranal_thread_fini();
1211         return 0;
1212 }
1213
1214 void
1215 kranal_process_rdmaq (__u32 cqid)
1216 {
1217         kra_conn_t          *conn;
1218         kra_tx_t            *tx;
1219         RAP_RETURN           rrc;
1220         unsigned long        flags;
1221         RAP_RDMA_DESCRIPTOR *desc;
1222         
1223         read_lock(&kranal_data.kra_global_lock);
1224
1225         conn = kranal_cqid2conn_locked(cqid);
1226         LASSERT (conn != NULL);
1227
1228         rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1229         LASSERT (rrc == RAP_SUCCESS);
1230
1231         spin_lock_irqsave(&conn->rac_lock, flags);
1232
1233         LASSERT (!list_empty(&conn->rac_rdmaq));
1234         tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1235         list_del(&tx->tx_list);
1236
1237         LASSERT(desc->AppPtr == (void *)tx);
1238         LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1239                 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1240
1241         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1242         tx->tx_qtime = jiffies;
1243         
1244         spin_unlock_irqrestore(&conn->rac_lock, flags);
1245
1246         /* Get conn's fmaq processed, now I've just put something there */
1247         kranal_schedule_conn(conn);
1248
1249         read_unlock(&kranal_data.kra_global_lock);
1250 }
1251
1252 int
1253 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1254                void *immediate, int immediatenob)
1255 {
1256         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1257         RAP_RETURN rrc;
1258         
1259         LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
1260         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1261                  immediatenob <= RANAL_FMA_MAX_DATA_LEN :
1262                  immediatenob == 0);
1263
1264         msg->ram_incarnation = conn->rac_my_incarnation;
1265         msg->ram_seq = conn->rac_tx_seq;
1266
1267         if (sync)
1268                 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1269                                       immediate, immediatenob,
1270                                       msg, sizeof(*msg));
1271         else
1272                 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1273                                   immediate, immediatenob,
1274                                   msg, sizeof(*msg));
1275
1276         switch (rrc) {
1277         default:
1278                 LBUG();
1279
1280         case RAP_SUCCESS:
1281                 conn->rac_last_tx = jiffies;
1282                 conn->rac_tx_seq++;
1283                 return 0;
1284                 
1285         case RAP_NOT_DONE:
1286                 return -EAGAIN;
1287         }
1288 }
1289
1290 int
1291 kranal_process_fmaq (kra_conn_t *conn) 
1292 {
1293         unsigned long flags;
1294         int           more_to_do;
1295         kra_tx_t     *tx;
1296         int           rc;
1297         int           expect_reply;
1298
1299         /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
1300          * out of credits when I try to send right now... */
1301
1302         if (conn->rac_closing) {
1303
1304                 if (!list_empty(&conn->rac_rdmaq)) {
1305                         /* Can't send CLOSE yet; I'm still waiting for RDMAs I
1306                          * posted to finish */
1307                         LASSERT (!conn->rac_close_sent);
1308                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1309                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1310                         return 0;
1311                 }
1312
1313                 if (conn->rac_close_sent)
1314                         return 0;
1315                 
1316                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1317                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1318                 conn->rac_close_sent = (rc == 0);
1319                 return 0;
1320         }
1321
1322         spin_lock_irqsave(&conn->rac_lock, flags);
1323
1324         if (list_empty(&conn->rac_fmaq)) {
1325
1326                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1327
1328                 if (time_after_eq(jiffies, 
1329                                   conn->rac_last_tx + conn->rac_keepalive)) {
1330                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1331                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1332                 }
1333                 return 0;
1334         }
1335         
1336         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1337         list_del(&tx->tx_list);
1338         more_to_do = !list_empty(&conn->rac_fmaq);
1339
1340         spin_unlock_irqrestore(&conn->rac_lock, flags);
1341
1342         expect_reply = 0;
1343         switch (tx->tx_msg.ram_type) {
1344         default:
1345                 LBUG();
1346                 
1347         case RANAL_MSG_IMMEDIATE:
1348         case RANAL_MSG_PUT_NAK:
1349         case RANAL_MSG_PUT_DONE:
1350         case RANAL_MSG_GET_NAK:
1351         case RANAL_MSG_GET_DONE:
1352                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1353                                     tx->tx_buffer, tx->tx_nob);
1354                 expect_reply = 0;
1355                 break;
1356                 
1357         case RANAL_MSG_PUT_REQ:
1358                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1359                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1360                 kranal_map_buffer(tx);
1361                 expect_reply = 1;
1362                 break;
1363
1364         case RANAL_MSG_PUT_ACK:
1365                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1366                 expect_reply = 1;
1367                 break;
1368
1369         case RANAL_MSG_GET_REQ:
1370                 kranal_map_buffer(tx);
1371                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1372                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1373                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
1374                         (__u64)((unsigned long)tx->tx_buffer);
1375                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1376                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1377                 expect_reply = 1;
1378                 break;
1379         }
1380
1381         if (rc == -EAGAIN) {
1382                 /* replace at the head of the list for later */
1383                 spin_lock_irqsave(&conn->rac_lock, flags);
1384                 list_add(&tx->tx_list, &conn->rac_fmaq);
1385                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1386
1387                 return 0;
1388         }
1389
1390         LASSERT (rc == 0);
1391         
1392         if (!expect_reply) {
1393                 kranal_tx_done(tx, 0);
1394         } else {
1395                 spin_lock_irqsave(&conn->rac_lock, flags);
1396                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1397                 tx->tx_qtime = jiffies;
1398                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1399         }
1400
1401         return more_to_do;
1402 }
1403
1404 static inline void
1405 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1406 {
1407         __swab64s(&d->rard_key.Key);
1408         __swab16s(&d->rard_key.Cookie);
1409         __swab16s(&d->rard_key.MdHandle);
1410         __swab32s(&d->rard_key.Flags);
1411         __swab64s(&d->rard_addr.AddressBits);
1412         __swab32s(&d->rard_nob);
1413 }
1414
1415 kra_tx_t *
1416 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1417 {
1418         unsigned long     flags;
1419         struct list_head *ttmp;
1420         kra_tx_t         *tx;
1421         
1422         list_for_each(ttmp, &conn->rac_replyq) {
1423                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1424                 
1425                 if (tx->tx_cookie != cookie)
1426                         continue;
1427                 
1428                 if (tx->tx_msg.ram_type != type) {
1429                         CWARN("Unexpected type %x (%x expected) "
1430                               "matched reply from "LPX64"\n",
1431                               tx->tx_msg.ram_type, type,
1432                               conn->rac_peer->rap_nid);
1433                         return NULL;
1434                 }
1435         }
1436         
1437         CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1438         return NULL;
1439 }
1440
1441 int
1442 kranal_process_receives(kra_conn_t *conn)
1443 {
1444         unsigned long flags;
1445         __u32         seq;
1446         __u32         nob;
1447         kra_tx_t     *tx;
1448         kra_msg_t    *msg;
1449         void         *prefix;
1450         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1451         kra_peer_t   *peer = conn->rac_peer;
1452
1453         if (rrc == RAP_NOT_DONE)
1454                 return 0;
1455         
1456         LASSERT (rrc == RAP_SUCCESS);
1457         conn->rac_last_rx = jiffies;
1458         seq = conn->rac_rx_seq++;
1459         msg = (kra_msg_t *)prefix;
1460
1461         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1462                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1463                         CERROR("Unexpected magic %08x from "LPX64"\n",
1464                                msg->ram_magic, peer->rap_nid);
1465                         goto out;
1466                 }
1467
1468                 __swab32s(&msg->ram_magic);
1469                 __swab16s(&msg->ram_version);
1470                 __swab16s(&msg->ram_type);
1471                 __swab64s(&msg->ram_srcnid);
1472                 __swab64s(&msg->ram_incarnation);
1473                 __swab32s(&msg->ram_seq);
1474
1475                 /* NB message type checked below; NOT here... */
1476                 switch (msg->ram_type) {
1477                 case RANAL_MSG_PUT_ACK:
1478                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1479                         break;
1480
1481                 case RANAL_MSG_GET_REQ:
1482                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1483                         break;
1484                         
1485                 default:
1486                         break;
1487                 }
1488         }
1489
1490         if (msg->ram_version != RANAL_MSG_VERSION) {
1491                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1492                        msg->ram_version, peer->rap_nid);
1493                 goto out;
1494         }
1495
1496         if (msg->ram_srcnid != peer->rap_nid) {
1497                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1498                        msg->ram_srcnid, peer->rap_nid);
1499                 goto out;
1500         }
1501         
1502         if (msg->ram_incarnation != conn->rac_peer_incarnation) {
1503                 CERROR("Unexpected incarnation "LPX64"("LPX64
1504                        " expected) from "LPX64"\n",
1505                        msg->ram_incarnation, conn->rac_peer_incarnation,
1506                        peer->rap_nid);
1507                 goto out;
1508         }
1509         
1510         if (msg->ram_seq != seq) {
1511                 CERROR("Unexpected sequence number %d(%d expected) from "
1512                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1513                 goto out;
1514         }
1515
1516         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1517                 /* This message signals RDMA completion: wait now... */
1518                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1519                 LASSERT (rrc == RAP_SUCCESS);
1520         }
1521  
1522         if (msg->ram_type == RANAL_MSG_CLOSE) {
1523                 conn->rac_close_recvd = 1;
1524                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1525
1526                 if (!conn->rac_closing)
1527                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1528                 else if (conn->rac_close_sent)
1529                         kranal_terminate_conn_locked(conn);
1530
1531                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1532                 goto out;
1533         }
1534
1535         if (conn->rac_closing)
1536                 goto out;
1537         
1538         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
1539                                                 /* they'll NULL rac_rxmsg if they consume it */
1540         switch (msg->ram_type) {
1541         case RANAL_MSG_NOOP:
1542                 /* Nothing to do; just a keepalive */
1543                 break;
1544                 
1545         case RANAL_MSG_IMMEDIATE:
1546                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1547                 break;
1548                 
1549         case RANAL_MSG_PUT_REQ:
1550                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1551
1552                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1553                         break;
1554
1555                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1556                 if (tx == NULL)
1557                         break;
1558                 
1559                 tx->tx_msg.ram_u.completion.racm_cookie = 
1560                         msg->ram_u.putreq.raprm_cookie;
1561                 kranal_post_fma(conn, tx);
1562                 break;
1563
1564         case RANAL_MSG_PUT_NAK:
1565                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1566                                         msg->ram_u.completion.racm_cookie);
1567                 if (tx == NULL)
1568                         break;
1569                 
1570                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1571                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1572                 kranal_tx_done(tx, -ENOENT);    /* no match */
1573                 break;
1574                 
1575         case RANAL_MSG_PUT_ACK:
1576                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1577                                         msg->ram_u.putack.rapam_src_cookie);
1578                 if (tx == NULL)
1579                         break;
1580
1581                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1582                             &msg->ram_u.putack.rapam_desc, 
1583                             msg->ram_u.putack.rapam_desc.rard_nob,
1584                             msg->ram_u.putack.rapam_dst_cookie);
1585                 break;
1586
1587         case RANAL_MSG_PUT_DONE:
1588                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1589                                         msg->ram_u.completion.racm_cookie);
1590                 if (tx == NULL)
1591                         break;
1592
1593                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1594                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1595                 kranal_tx_done(tx, 0);
1596                 break;
1597
1598         case RANAL_MSG_GET_REQ:
1599                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1600                 
1601                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1602                         break;
1603
1604                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1605                 if (tx == NULL)
1606                         break;
1607
1608                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1609                 kranal_post_fma(conn, tx);
1610                 break;
1611                 
1612         case RANAL_MSG_GET_NAK:
1613                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1614                                         msg->ram_u.completion.racm_cookie);
1615                 if (tx == NULL)
1616                         break;
1617                 
1618                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1619                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1620                 kranal_tx_done(tx, -ENOENT);    /* no match */
1621                 break;
1622                 
1623         case RANAL_MSG_GET_DONE:
1624                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1625                                         msg->ram_u.completion.racm_cookie);
1626                 if (tx == NULL)
1627                         break;
1628                 
1629                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1630                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1631                 kranal_tx_done(tx, 0);
1632                 break;
1633         }
1634
1635  out:
1636         if (conn->rac_rxmsg != NULL)
1637                 kranal_consume_rxmsg(conn, NULL, 0);
1638
1639         return 1;
1640 }
1641
1642 int
1643 kranal_scheduler (void *arg)
1644 {
1645         kra_device_t   *dev = (kra_device_t *)arg;
1646         wait_queue_t    wait;
1647         char            name[16];
1648         kra_conn_t     *conn;
1649         unsigned long   flags;
1650         RAP_RETURN      rrc;
1651         int             rc;
1652         int             resched;
1653         int             i;
1654         __u32           cqid;
1655         __u32           event_type;
1656         int             did_something;
1657         int             busy_loops = 0;
1658
1659         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1660         kportal_daemonize(name);
1661         kportal_blockallsigs();
1662
1663         init_waitqueue_entry(&wait, current);
1664
1665         spin_lock_irqsave(&dev->rad_lock, flags);
1666
1667         while (!kranal_data.kra_shutdown) {
1668                 /* Safe: kra_shutdown only set when quiescent */
1669                 
1670                 if (busy_loops++ >= RANAL_RESCHED) {
1671                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1672
1673                         our_cond_resched();
1674                         busy_loops = 0;
1675
1676                         spin_lock_irqsave(&dev->rad_lock, flags);
1677                 }
1678
1679                 did_something = 0;
1680
1681                 if (dev->rad_ready) {
1682                         dev->rad_ready = 0;
1683                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1684
1685                         rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
1686
1687                         LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1688                         LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1689                         
1690                         if (rrc == RAP_SUCCESS) {
1691                                 kranal_process_rdmaq(cqid);
1692                                 did_something = 1;
1693                         }
1694                         
1695                         rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
1696                         LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1697                         
1698                         if (rrc == RAP_SUCCESS) {
1699                                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
1700                                         kranal_schedule_dev(dev);
1701                                 else
1702                                         kranal_schedule_cqid(cqid);
1703                                 did_something = 1;
1704                         }
1705                         
1706                         spin_lock_irqsave(&dev->rad_lock, flags);
1707
1708                         /* If there were no completions to handle, I leave
1709                          * rad_ready clear.  NB I cleared it BEFORE I checked
1710                          * the completion queues since I'm racing with the
1711                          * device callback. */
1712
1713                         if (did_something)
1714                                 dev->rad_ready = 1;
1715                 }
1716                 
1717                 if (!list_empty(&dev->rad_connq)) {
1718                         conn = list_entry(dev->rad_connq.next,
1719                                           kra_conn_t, rac_schedlist);
1720                         list_del(&conn->rac_schedlist);
1721                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1722
1723                         LASSERT (conn->rac_scheduled);
1724
1725                         resched  = kranal_process_fmaq(conn);
1726                         resched |= kranal_process_receives(conn);
1727                         did_something = 1;
1728
1729                         spin_lock_irqsave(&dev->rad_lock, flags);
1730                         if (resched)
1731                                 list_add_tail(&conn->rac_schedlist,
1732                                               &dev->rad_connq);
1733                 }
1734
1735                 if (did_something)
1736                         continue;
1737
1738                 add_wait_queue(&dev->rad_waitq, &wait);
1739                 set_current_state(TASK_INTERRUPTIBLE);
1740
1741                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1742
1743                 busy_loops = 0;
1744                 schedule();
1745
1746                 set_current_state(TASK_RUNNING);
1747                 remove_wait_queue(&dev->rad_waitq, &wait);
1748
1749                 spin_lock_irqsave(&dev->rad_lock, flags);
1750         }
1751
1752         spin_unlock_irqrestore(&dev->rad_lock, flags);
1753
1754         kranal_thread_fini();
1755         return 0;
1756 }
1757
1758
1759 lib_nal_t kranal_lib = {
1760         libnal_data:        &kranal_data,      /* NAL private data */
1761         libnal_send:         kranal_send,
1762         libnal_send_pages:   kranal_send_pages,
1763         libnal_recv:         kranal_recv,
1764         libnal_recv_pages:   kranal_recv_pages,
1765         libnal_dist:         kranal_dist
1766 };