Whamcloud - gitweb
* Added ranal subdir
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid)
42 {
43         kra_device_t *dev;
44         int           i;
45         
46         for (i = 0; i < kranal_data.kra_ndevs; i++) {
47
48                 dev = &kranal_data.kra_devices[i];
49                 if (dev->rad_id != devid)
50                         continue;
51
52                 spin_lock_irqsave(&dev->rad_lock, flags);
53
54                 if (!dev->rad_ready) {
55                         dev->rad_ready = 1;
56                         wake_up(&dev->rad_waitq);
57                 }
58
59                 spin_unlock_irqrestore(&dev->rad_lock, flags);
60                 return;
61         }
62         
63         CWARN("callback for unknown device %d\n", devid);
64 }
65
66 void
67 kranal_schedule_conn(kra_conn_t *conn)
68 {
69         kra_device_t    *dev = conn->rac_device;
70         unsigned long    flags;
71         
72         spin_lock_irqsave(&dev->rad_lock, flags);
73         
74         if (!conn->rac_scheduled) {
75                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
76                 conn->rac_scheduled = 1;
77                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
78                 wake_up(&dev->rad_waitq);
79         }
80
81         spin_unlock_irqrestore(&dev->rad_lock, flags);
82 }
83
84 void
85 kranal_schedule_cqid (__u32 cqid)
86 {
87         kra_conn_t         *conn;
88         struct list_head   *conns;
89         struct list_head   *tmp;
90
91         conns = kranal_cqid2connlist(cqid);
92
93         read_lock(&kranal_data.kra_global_lock);
94
95         conn = kranal_cqid2conn_locked(cqid);
96         
97         if (conn == NULL)
98                 CWARN("no cqid %x\n", cqid);
99         else
100                 kranal_schedule_conn(conn);
101         
102         read_unlock(&kranal_data.kra_global_lock);
103 }
104
105 void
106 kranal_schedule_dev(kra_device_t *dev)
107 {
108         kra_conn_t         *conn;
109         struct list_head   *conns;
110         struct list_head   *tmp;
111         int                 i;
112
113         /* Don't do this in IRQ context (servers may have 1000s of clients) */
114         LASSERT (!in_interrupt()); 
115
116         CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
117
118         for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
119
120                 /* Drop the lock on each hash bucket to ensure we don't
121                  * block anyone for too long at IRQ priority on another CPU */
122                 
123                 read_lock(&kranal_data.kra_global_lock);
124         
125                 conns = &kranal_data.kra_conns[i];
126
127                 list_for_each (tmp, conns) {
128                         conn = list_entry(tmp, kra_conn_t, rac_hashlist);
129                 
130                         if (conn->rac_device == dev)
131                                 kranal_schedule_conn(conn);
132                 }
133                 read_unlock(&kranal_data.kra_global_lock);
134         }
135 }
136
137 void
138 kranal_tx_done (kra_tx_t *tx, int completion)
139 {
140         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
141         kra_device_t    *dev;
142         unsigned long    flags;
143         int              i;
144         RAP_RETURN       rrc;
145
146         LASSERT (!in_interrupt());
147
148         switch (tx->tx_buftype) {
149         default:
150                 LBUG();
151
152         case RANAL_BUF_NONE:
153         case RANAL_BUF_IMMEDIATE:
154         case RANAL_BUF_PHYS_UNMAPPED:
155         case RANAL_BUF_VIRT_UNMAPPED:
156                 break;
157
158         case RANAL_BUF_PHYS_MAPPED:
159                 LASSERT (tx->tx_conn != NULL);
160                 dev = tx->tx_con->rac_device;
161                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
162                                            dev->rad_ptag, &tx->tx_map_key);
163                 LASSERT (rrc == RAP_SUCCESS);
164                 break;
165
166         case RANAL_BUF_VIRT_MAPPED:
167                 LASSERT (tx->tx_conn != NULL);
168                 dev = tx->tx_con->rac_device;
169                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer
170                                            dev->rad_ptag, &tx->tx_map_key);
171                 LASSERT (rrc == RAP_SUCCESS);
172                 break;
173         }
174
175         for (i = 0; i < 2; i++) {
176                 /* tx may have up to 2 libmsgs to finalise */
177                 if (tx->tx_libmsg[i] == NULL)
178                         continue;
179
180                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
181                 tx->tx_libmsg[i] = NULL;
182         }
183
184         tx->tx_buftype = RANAL_BUF_NONE;
185         tx->tx_msg.ram_type = RANAL_MSG_NONE;
186         tx->tx_conn = NULL;
187
188         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
189
190         if (tx->tx_isnblk) {
191                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
192         } else {
193                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
194                 wake_up(&kranal_data.kra_idle_tx_waitq);
195         }
196
197         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
198 }
199
200 kra_tx_t *
201 kranal_get_idle_tx (int may_block) 
202 {
203         unsigned long  flags;
204         kra_tx_t      *tx = NULL;
205         
206         for (;;) {
207                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
208
209                 /* "normal" descriptor is free */
210                 if (!list_empty(&kranal_data.kra_idle_txs)) {
211                         tx = list_entry(kranal_data.kra_idle_txs.next,
212                                         kra_tx_t, tx_list);
213                         break;
214                 }
215
216                 if (!may_block) {
217                         /* may dip into reserve pool */
218                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
219                                 CERROR("reserved tx desc pool exhausted\n");
220                                 break;
221                         }
222
223                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
224                                         kra_tx_t, tx_list);
225                         break;
226                 }
227
228                 /* block for idle tx */
229                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
230
231                 wait_event(kranal_data.kra_idle_tx_waitq,
232                            !list_empty(&kranal_data.kra_idle_txs));
233         }
234
235         if (tx != NULL) {
236                 list_del(&tx->tx_list);
237
238                 /* Allocate a new completion cookie.  It might not be
239                  * needed, but we've got a lock right now... */
240                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
241
242                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
243                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
244                 LASSERT (tx->tx_conn == NULL);
245                 LASSERT (tx->tx_libmsg[0] == NULL);
246                 LASSERT (tx->tx_libmsg[1] == NULL);
247         }
248
249         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
250         
251         return tx;
252 }
253
254 void
255 kranal_init_msg(kra_msg_t *msg, int type)
256 {
257         msg->ram_magic = RANAL_MSG_MAGIC;
258         msg->ram_version = RANAL_MSG_VERSION;
259         msg->ram_type = type;
260         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
261         /* ram_incarnation gets set when FMA is sent */
262 }
263
264 kra_tx_t
265 kranal_new_tx_msg (int may_block, int type)
266 {
267         kra_tx_t *tx = kranal_get_idle_tx(may_block);
268
269         if (tx == NULL)
270                 return NULL;
271
272         kranal_init_msg(&tx->tx_msg, type);
273         return tx;
274 }
275
276 int
277 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
278                                int offset, int nob)
279                  
280 {
281         LASSERT (nob > 0);
282         LASSERT (niov > 0);
283         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
284
285         while (offset >= iov->iov_len) {
286                 offset -= iov->iov_len;
287                 niov--;
288                 iov++;
289                 LASSERT (niov > 0);
290         }
291
292         if (nob > iov->iov_len - offset) {
293                 CERROR("Can't handle multiple vaddr fragments\n");
294                 return -EMSGSIZE;
295         }
296
297         tx->tx_bufftype = RANAL_BUF_IMMEDIATE;
298         tx->tx_nob = nob;
299         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
300         return 0;
301 }
302
303 int
304 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
305                           int offset, int nob)
306                  
307 {
308         LASSERT (nob > 0);
309         LASSERT (niov > 0);
310         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
311
312         while (offset >= iov->iov_len) {
313                 offset -= iov->iov_len;
314                 niov--;
315                 iov++;
316                 LASSERT (niov > 0);
317         }
318
319         if (nob > iov->iov_len - offset) {
320                 CERROR("Can't handle multiple vaddr fragments\n");
321                 return -EMSGSIZE;
322         }
323
324         tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED;
325         tx->tx_nob = nob;
326         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
327         return 0;
328 }
329
330 int
331 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
332                           int offset, int nob)
333 {
334         RAP_PHYS_REGION *phys = tx->tx_phys;
335         int              resid;
336
337         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
338
339         LASSERT (nob > 0);
340         LASSERT (nkiov > 0);
341         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
342
343         while (offset >= kiov->kiov_len) {
344                 offset -= kiov->kiov_len;
345                 nkiov--;
346                 kiov++;
347                 LASSERT (nkiov > 0);
348         }
349
350         tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED;
351         tx->tx_nob = nob;
352         tx->tx_buffer = NULL;
353         tx->tx_phys_offset = kiov->kiov_offset + offset;
354         
355         phys->Address = kranal_page2phys(kiov->kiov_page);
356         phys->Length  = PAGE_SIZE;
357         phys++;
358
359         resid = nob - (kiov->kiov_len - offset);
360         while (resid > 0) {
361                 kiov++;
362                 nkiov--;
363                 LASSERT (nkiov > 0);
364
365                 if (kiov->kiov_offset != 0 ||
366                     ((resid > PAGE_SIZE) && 
367                      kiov->kiov_len < PAGE_SIZE)) {
368                         int i;
369                         /* Can't have gaps */
370                         CERROR("Can't make payload contiguous in I/O VM:"
371                                "page %d, offset %d, len %d \n", nphys, 
372                                kiov->kiov_offset, kiov->kiov_len);
373
374                         for (i = -nphys; i < nkiov; i++) {
375                                 CERROR("kiov[%d] %p +%d for %d\n",
376                                        i, kiov[i].kiov_page, 
377                                        kiov[i].kiov_offset, kiov[i].kiov_len);
378                         }
379                         
380                         return -EINVAL;
381                 }
382
383                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
384                         CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
385                         return -EMSGSIZE;
386                 }
387
388                 phys->Address = kranal_page2phys(kiov->kiov_page);
389                 phys->Length  = PAGE_SIZE;
390                 phys++;
391
392                 resid -= PAGE_SIZE;
393         }
394
395         tx->tx_phys_npages = phys - tx->tx_phys;
396         return 0;
397 }
398
399 static inline int
400 kranal_setup_buffer (kra_tx_t *tx, int niov, 
401                      struct iovec *iov, ptl_kiov_t *kiov,
402                      int offset, int nob)
403 {
404         LASSERT ((iov == NULL) != (kiov == NULL));
405         
406         if (kiov != NULL)
407                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
408         
409         return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob);
410 }
411
412 void
413 kranal_map_buffer (kra_tx_t *tx)
414 {
415         kra_conn_t     *conn = tx->tx_conn;
416         kra_device_t   *dev = conn->rac_device;
417
418         switch (tx->tx_buftype) {
419         default:
420                 
421         case RANAL_BUF_PHYS_UNMAPPED:
422                 rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
423                                        tx->tx_phys, tx->tx_phys_npages,
424                                        conn->rac_device->rad_ptag,
425                                        &tx->tx_map_key);
426                 LASSERT (rrc == RAP_SUCCESS);
427                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
428                 return;
429
430         case RANAL_BUF_VIRT_UNMAPPED:
431                 rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
432                                          tx->tx_buffer, tx->tx_nob,
433                                          conn->rac_device->rad_ptag,
434                                          &tx->tx_map_key);
435                 LASSERT (rrc == RAP_SUCCESS);
436                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
437                 return;
438         }
439 }
440
441 kra_conn_t *
442 kranal_find_conn_locked (kra_peer_t *peer)
443 {
444         struct list_head *tmp;
445
446         /* just return the first connection */
447         list_for_each (tmp, &peer->rap_conns) {
448                 return list_entry(tmp, kra_conn_t, rac_list);
449         }
450
451         return NULL;
452 }
453
454 void
455 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
456 {
457         unsigned long    flags;
458
459         tx->tx_conn = conn;
460
461         spin_lock_irqsave(&conn->rac_lock, flags);
462         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
463         tx->tx_qtime = jiffies;
464         spin_unlock_irqrestore(&conn->rac_lock, flags);
465
466         kranal_schedule_conn(conn);
467 }
468
469 void
470 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
471 {
472         unsigned long    flags;
473         kra_peer_t      *peer;
474         kra_conn_t      *conn;
475         unsigned long    now;
476         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
477
478         /* If I get here, I've committed to send, so I complete the tx with
479          * failure on any problems */
480         
481         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
482
483         read_lock(g_lock);
484         
485         peer = kranal_find_peer_locked(nid);
486         if (peer == NULL) {
487                 read_unlock(g_lock);
488                 kranal_tx_done(tx, -EHOSTUNREACH);
489                 return;
490         }
491
492         conn = kranal_find_conn_locked(peer);
493         if (conn != NULL) {
494                 kranal_post_fma(conn, tx);
495                 read_unlock(g_lock);
496                 return;
497         }
498         
499         /* Making one or more connections; I'll need a write lock... */
500         read_unlock(g_lock);
501         write_lock_irqsave(g_lock, flags);
502
503         peer = kranal_find_peer_locked(nid);
504         if (peer == NULL) {
505                 write_unlock_irqrestore(g_lock, flags);
506                 kranal_tx_done(tx -EHOSTUNREACH);
507                 return;
508         }
509
510         conn = kranal_find_conn_locked(peer);
511         if (conn != NULL) {
512                 /* Connection exists; queue message on it */
513                 kranal_post_fma(conn, tx);
514                 write_unlock_irqrestore(g_lock, flags);
515                 return;
516         }
517
518         LASSERT (peer->rap_persistence > 0);
519
520         if (!peer->rap_connecting) {
521                 now = CURRENT_TIME;
522                 if (now < peer->rap_reconnect_time) {
523                         write_unlock_irqrestore(g_lock, flags);
524                         kranal_tx_done(tx, -EHOSTUNREACH);
525                         return;
526                 }
527         
528                 peer->rap_connecting = 1;
529                 kranal_peer_addref(peer); /* extra ref for connd */
530         
531                 spin_lock(&kranal_data.kra_connd_lock);
532         
533                 list_add_tail(&peer->rap_connd_list,
534                               &kranal_data.kra_connd_peers);
535                 wake_up(&kranal_data.kra_connd_waitq);
536         
537                 spin_unlock(&kranal_data.kra_connd_lock);
538         }
539         
540         /* A connection is being established; queue the message... */
541         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
542
543         write_unlock_irqrestore(g_lock, flags);
544 }
545
546 static void
547 kranal_rdma(kra_tx_t *tx, int type, 
548             kra_rdma_desc_t *rard, int nob, __u64 cookie)
549 {
550         kra_conn_t *conn = tx->tx_conn;
551         RAP_RETURN  rrc;
552
553         /* prep final completion message */
554         kranal_init_msg(&tx->tx_msg, type);
555         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
556         
557         LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
558                  tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
559         LASSERT (nob <= rard->rard_nob);
560
561         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
562         tx->tx_rdma_desc.SrcPtr = tx->tx_buffer;
563         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
564         tx->tx_rdma_desc.DstPtr = rard->rard_addr;
565         tx->tx_rdma_desc.DstKey = rard->rard_key;
566         tx->tx_rdma_desc.Length = nob;
567         tx->tx_rdma_desc.AppPtr = tx;
568
569         if (nob == 0) { /* Immediate completion */
570                 kranal_post_fma(conn, tx);
571                 return;
572         }
573         
574         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
575         LASSERT (rrc == RAP_SUCCESS);
576
577         spin_lock_irqsave(&conn->rac_lock, flags);
578         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
579         tx->tx_qtime = jiffies;
580         spin_unlock_irqrestore(&conn->rac_lock, flags);
581 }
582
583 int
584 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
585 {
586         __u32      nob_received = nob;
587         RAP_RETURN rrc;
588
589         LASSERT (conn->rac_rxmsg != NULL);
590
591         rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
592                                 &nob_received, sizeof(kra_msg_t));
593         LASSERT (rrc == RAP_SUCCESS);
594
595         conn->rac_rxmsg = NULL;
596
597         if (nob_received != nob) {
598                 CWARN("Expected %d immediate bytes but got %d\n",
599                       nob, nob_received);
600                 return -EPROTO;
601         }
602         
603         return 0;
604 }
605
606 ptl_err_t
607 kranal_do_send (lib_nal_t    *nal, 
608                 void         *private,
609                 lib_msg_t    *libmsg,
610                 ptl_hdr_t    *hdr, 
611                 int           type, 
612                 ptl_nid_t     nid, 
613                 ptl_pid_t     pid,
614                 unsigned int  niov, 
615                 struct iovec *iov, 
616                 ptl_kiov_t   *kiov,
617                 size_t        offset,
618                 size_t        nob)
619 {
620         kra_conn_t *conn;
621         kra_tx_t   *tx;
622
623         /* NB 'private' is different depending on what we're sending.... */
624
625         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
626                " pid %d\n", nob, niov, nid , pid);
627
628         LASSERT (nob == 0 || niov > 0);
629         LASSERT (niov <= PTL_MD_MAX_IOV);
630
631         LASSERT (!in_interrupt());
632         /* payload is either all vaddrs or all pages */
633         LASSERT (!(kiov != NULL && iov != NULL));
634
635         switch(type) {
636         default:
637                 LBUG();
638                 
639         case PTL_MSG_REPLY: {
640                 /* reply's 'private' is the conn that received the GET_REQ */
641                 conn = private;
642                 LASSERT (conn->rac_rxmsg != NULL);
643
644                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
645                         if (nob > RANAL_MAX_IMMEDIATE) {
646                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
647                                        nob, nid);
648                                 return PTL_FAIL;
649                         }
650                         break;                  /* RDMA not expected */
651                 }
652                 
653                 /* Incoming message consistent with immediate reply? */
654                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
655                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
656                                nid, conn->rac_rxmsg->ram_type);
657                         return PTL_FAIL;
658                 }
659
660                 tx = kranal_get_idle_tx(0);
661                 if (tx == NULL)
662                         return PTL_FAIL;
663
664                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
665                 if (rc != 0) {
666                         kranal_tx_done(tx, rc);
667                         return PTL_FAIL;
668                 }
669
670                 tx->tx_conn = conn;
671                 tx->tx_libmsg[0] = libmsg;
672
673                 kranal_map_buffer(tx);
674                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
675                             &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob,
676                             &conn->rac_rxmsg->ram_u.getreq.ragm_cookie);
677                 return PTL_OK;
678         }
679
680         case PTL_MSG_GET:
681                 if (kiov == NULL &&             /* not paged */
682                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
683                     nob <= kranal_tunables.kra_max_immediate)
684                         break;                  /* send IMMEDIATE */
685
686                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
687                 if (tx == NULL)
688                         return PTL_NO_SPACE;
689
690                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
691                 if (rc != 0) {
692                         kranal_tx_done(tx, rc);
693                         return PTL_FAIL;
694                 }
695
696                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
697                 if (tx->tx_libmsg[1] == NULL) {
698                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
699                         kranal_tx_done(tx, rc);
700                         return PTL_FAIL;
701                 }
702
703                 tx->tx_libmsg[0] = libmsg;
704                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
705                 /* rest of tx_msg is setup just before it is sent */
706                 kranal_launch_tx(tx, nid);
707                 return PTL_OK
708
709         case PTL_MSG_ACK:
710                 LASSERT (nob == 0);
711                 break;
712
713         case PTL_MSG_PUT:
714                 if (kiov == NULL &&             /* not paged */
715                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
716                     nob <= kranal_tunables.kra_max_immediate)
717                         break;                  /* send IMMEDIATE */
718                 
719                 tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ);
720                 if (tx == NULL)
721                         return PTL_NO_SPACE;
722
723                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
724                 if (rc != 0) {
725                         kranal_tx_done(tx, rc);
726                         return PTL_FAIL;
727                 }
728
729                 tx->tx_libmsg[0] = libmsg;
730                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
731                 /* rest of tx_msg is setup just before it is sent */
732                 kranal_launch_tx(tx, nid);
733                 return PTL_OK;
734         }
735
736         LASSERT (kiov == NULL);
737         LASSERT (nob <= RANAL_MAX_IMMEDIATE);
738
739         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
740                                  type == PTL_MSG_REPLY ||
741                                  in_interrupt()), 
742                                RANAL_MSG_IMMEDIATE);
743         if (tx == NULL)
744                 return PTL_NO_SPACE;
745
746         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
747         if (rc != 0) {
748                 kranal_tx_done(tx, rc);
749                 return PTL_FAIL;
750         }
751                 
752         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
753         tx->tx_libmsg[0] = libmsg;
754         kranal_launch_tx(tx, nid);
755         return PTL_OK;
756 }
757
758 ptl_err_t
759 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
760              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
761              unsigned int niov, struct iovec *iov,
762              size_t offset, size_t len)
763 {
764         return kranal_do_send(nal, private, cookie,
765                               hdr, type, nid, pid,
766                               niov, iov, NULL,
767                               offset, len);
768 }
769
770 ptl_err_t
771 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
772                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
773                    unsigned int niov, ptl_kiov_t *kiov, 
774                    size_t offset, size_t len)
775 {
776         return kranal_do_send(nal, private, cookie,
777                               hdr, type, nid, pid,
778                               niov, NULL, kiov,
779                               offset, len);
780 }
781
782 ptl_err_t
783 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
784                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
785                 size_t offset, size_t mlen, size_t rlen)
786 {
787         kra_conn_t  *conn = private;
788         kra_msg_t   *rxmsg = conn->rac_rxmsg;
789         void        *buffer;
790         int          rc;
791         
792         LASSERT (mlen <= rlen);
793         LASSERT (!in_interrupt());
794         /* Either all pages or all vaddrs */
795         LASSERT (!(kiov != NULL && iov != NULL));
796
797         switch(rxmsg->ram_type) {
798         default:
799                 LBUG();
800                 return PTL_FAIL;
801                 
802         case RANAL_MSG_IMMEDIATE:
803                 if (mlen == 0) {
804                         buffer = NULL;
805                 } else if (kiov != NULL) {
806                         CERROR("Can't recv immediate into paged buffer\n");
807                         return PTL_FAIL;
808                 } else {
809                         LASSERT (niov > 0);
810                         while (offset >= iov->iov_len) {
811                                 offset -= iov->iov_len;
812                                 iov++;
813                                 niov--;
814                                 LASSERT (niov > 0);
815                         }
816                         if (mlen > iov->iov_len - offset) {
817                                 CERROR("Can't handle immediate frags\n");
818                                 return PTL_FAIL;
819                         }
820                         buffer = ((char *)iov->iov_base) + offset;
821                 }
822                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
823                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
824                 return PTL_OK;
825
826         case RANAL_MSG_GET_REQ:
827                 /* If the GET matched, we've already handled it in
828                  * kranal_do_send which is called to send the REPLY.  We're
829                  * only called here to complete the GET receive (if we needed
830                  * it which we don't, but I digress...) */
831                 LASSERT (libmsg == NULL);
832                 lib_finalize(nal, NULL, libmsg, PTL_OK);
833                 return PTL_OK;
834
835         case RANAL_MSG_PUT_REQ:
836                 if (libmsg == NULL) {           /* PUT didn't match... */
837                         lib_finalize(null, NULL, libmsg, PTL_OK);
838                         return PTL_OK;
839                 }
840                 
841                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
842                 if (tx == NULL)
843                         return PTL_NO_SPACE;
844
845                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
846                 if (rc != 0) {
847                         kranal_tx_done(tx, rc);
848                         return PTL_FAIL;
849                 }
850
851                 kranal_map_buffer(tx);
852                 
853                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
854                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
855                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
856                 tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key;
857                 tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer;
858                 tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen;
859
860                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
861
862                 kranal_post_fma(conn, tx);
863                 
864                 /* flag matched by consuming rx message */
865                 kranal_consume_rxmsg(conn, NULL, 0);
866                 return PTL_OK;
867         }
868 }
869
870 ptl_err_t
871 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
872              unsigned int niov, struct iovec *iov, 
873              size_t offset, size_t mlen, size_t rlen)
874 {
875         return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
876                               offset, mlen, rlen);
877 }
878
879 ptl_err_t
880 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
881                    unsigned int niov, ptl_kiov_t *kiov, 
882                    size_t offset, size_t mlen, size_t rlen)
883 {
884         return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
885                               offset, mlen, rlen);
886 }
887
888 int
889 kranal_thread_start (int(*fn)(void *arg), void *arg)
890 {
891         long    pid = kernel_thread(fn, arg, 0);
892
893         if (pid < 0)
894                 return(int)pid;
895
896         atomic_inc(&kranal_data.kra_nthreads);
897         return 0;
898 }
899
900 void
901 kranal_thread_fini (void)
902 {
903         atomic_dec(&kranal_data.kra_nthreads);
904 }
905
906 int
907 kranal_check_conn (kra_conn_t *conn)
908 {
909         kra_tx_t          *tx;
910         struct list_head  *ttmp;
911         unsigned long      flags;
912         long               timeout;
913         unsigned long      now = jiffies;
914
915         if (!conn->rac_closing &&
916             time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) {
917                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
918                 kranal_schedule_conn(conn);
919         }
920
921         /* wait twice as long for CLOSE to be sure peer is dead */
922         timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
923
924         if (!conn->rac_close_recvd &&
925             time_after_eq(now, conn->rac_last_rx + timeout)) {
926                 CERROR("Nothing received from "LPX64" within %d seconds\n",
927                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
928                 return -ETIMEDOUT;
929         }
930
931         if (conn->rac_closing)
932                 return 0;
933         
934         /* Check the conn's queues are moving.  These are "belt+braces" checks,
935          * in case of hardware/software errors that make this conn seem
936          * responsive even though it isn't progressing its message queues. */
937
938         spin_lock_irqsave(&conn->rac_lock, flags);
939
940         list_for_each (ttmp, &conn->rac_fmaq) {
941                 tx = list_entry(ttmp, kra_tx_t, tx_list);
942                 
943                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
944                         spin_unlock_irqrestore(&conn->rac_lock, flags);
945                         CERROR("tx on fmaq for "LPX64" blocked %d seconds\n",
946                                conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
947                         return -ETIMEDOUT;
948                 }
949         }
950         
951         list_for_each (ttmp, &conn->rac_rdmaq) {
952                 tx = list_entry(ttmp, kra_tx_t, tx_list);
953                 
954                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
955                         spin_unlock_irqrestore(&conn->rac_lock, flags);
956                         CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n",
957                                conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
958                         return -ETIMEDOUT;
959                 }
960         }
961         
962         list_for_each (ttmp, &conn->rac_replyq) {
963                 tx = list_entry(ttmp, kra_tx_t, tx_list);
964                 
965                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
966                         spin_unlock_irqrestore(&conn->rac_lock, flags);
967                         CERROR("tx on replyq for "LPX64" blocked %d seconds\n",
968                                conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
969                         return -ETIMEDOUT;
970                 }
971         }
972         
973         spin_unlock_irqrestore(&conn->rac_lock, flags);
974         return 0;
975 }
976
977 void
978 kranal_check_conns (int idx, unsigned long *min_timeoutp)
979 {
980         struct list_head  *conns = &kranal_data.kra_conns[idx];
981         struct list_head  *ctmp;
982         kra_conn_t        *conn;
983
984  again:
985         /* NB. We expect to check all the conns and not find any problems, so
986          * we just use a shared lock while we take a look... */
987         read_lock(&kranal_data.kra_global_lock);
988
989         list_for_each (ctmp, conns) {
990                 conn = list_entry(ptmp, kra_conn_t, rac_hashlist);
991
992                 if (conn->rac_timeout < *min_timeoutp )
993                         *min_timeoutp = conn->rac_timeout;
994                 if (conn->rac_keepalive < *min_timeoutp )
995                         *min_timeoutp = conn->rac_keepalive;
996
997                 rc = kranal_check_conn(conn);
998                 if (rc == 0)
999                         continue;
1000
1001                 kranal_conn_addref(conn);
1002                 read_unlock(&kranal_data.kra_global_lock);
1003
1004                 CERROR("Check on conn to "LPX64"failed: %d\n",
1005                        conn->rac_peer->rap_nid, rc);
1006
1007                 write_lock_irqsave(&kranal_data.kra_global_lock);
1008
1009                 if (!conn->rac_closing)
1010                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1011                 else
1012                         kranal_terminate_conn_locked(conn);
1013                         
1014                 kranal_conn_decref(conn);
1015
1016                 /* start again now I've dropped the lock */
1017                 goto again;
1018         }
1019
1020         read_unlock(&kranal_data.kra_global_lock);
1021 }
1022
1023 int
1024 kranal_connd (void *arg)
1025 {
1026         char               name[16];
1027         wait_queue_t       wait;
1028         unsigned long      flags;
1029         kra_peer_t        *peer;
1030         int                i;
1031
1032         snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1033         kportal_daemonize(name);
1034         kportal_blockallsigs();
1035
1036         init_waitqueue_entry(&wait, current);
1037
1038         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1039
1040         while (!kranal_data.kra_shutdown) {
1041                 /* Safe: kra_shutdown only set when quiescent */
1042
1043                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1044                         peer = list_entry(kranal_data.kra_connd_peers.next,
1045                                           kra_peer_t, rap_connd_list);
1046                         
1047                         list_del_init(&peer->rap_connd_list);
1048                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1049
1050                         kranal_connect(peer);
1051                         kranal_put_peer(peer);
1052
1053                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1054                         continue;
1055                 }
1056
1057                 set_current_state(TASK_INTERRUPTIBLE);
1058                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1059                 
1060                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1061
1062                 schedule ();
1063                 
1064                 set_current_state(TASK_RUNNING);
1065                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1066
1067                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1068         }
1069
1070         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1071
1072         kranal_thread_fini();
1073         return 0;
1074 }
1075
1076 void
1077 kranal_update_reaper_timeout(long timeout) 
1078 {
1079         unsigned long   flags;
1080
1081         LASSERT (timeout > 0);
1082         
1083         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1084         
1085         if (timeout < kranal_data.kra_new_min_timeout)
1086                 kranal_data.kra_new_min_timeout = timeout;
1087
1088         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1089 }
1090
1091 int
1092 kranal_reaper (void *arg)
1093 {
1094         wait_queue_t       wait;
1095         unsigned long      flags;
1096         kra_conn_t        *conn;
1097         kra_peer_t        *peer;
1098         unsigned long      flags;
1099         long               timeout;
1100         int                i;
1101         int                conn_entries = kranal_data.kra_conn_hash_size;
1102         int                conn_index = 0;
1103         int                base_index = conn_entries - 1;
1104         unsigned long      next_check_time = jiffies;
1105         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1106         long               current_min_timeout = 1;
1107         
1108         kportal_daemonize("kranal_reaper");
1109         kportal_blockallsigs();
1110
1111         init_waitqueue_entry(&wait, current);
1112
1113         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1114         kranal_data.kra_new_min_timeout = 1;
1115
1116         while (!kranal_data.kra_shutdown) {
1117
1118                 /* careful with the jiffy wrap... */
1119                 timeout = (long)(next_check_time - jiffies);
1120                 if (timeout <= 0) {
1121                 
1122                         /* I wake up every 'p' seconds to check for
1123                          * timeouts on some more peers.  I try to check
1124                          * every connection 'n' times within the global
1125                          * minimum of all keepalive and timeout intervals,
1126                          * to ensure I attend to every connection within
1127                          * (n+1)/n times its timeout intervals. */
1128                 
1129                         const int     p = 1;
1130                         const int     n = 3;
1131                         unsigned long min_timeout;
1132                         int           chunk;
1133
1134                         if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1135                                 /* new min timeout set: restart min timeout scan */
1136                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1137                                 base_index = conn_index - 1;
1138                                 if (base_index < 0)
1139                                         base_index = conn_entries - 1;
1140
1141                                 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1142                                         current_min_timeout = kranal_data.kra_new_min_timeout;
1143                                         CWARN("Set new min timeout %ld\n",
1144                                               current_min_timeout);
1145                                 }
1146
1147                                 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1148                         }
1149                         min_timeout = current_min_timeout;
1150
1151                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1152                                                flags);
1153
1154                         LASSERT (min_timeout > 0);
1155
1156                         /* Compute how many table entries to check now so I
1157                          * get round the whole table fast enough (NB I do
1158                          * this at fixed intervals of 'p' seconds) */
1159                         chunk = conn_entries;
1160                         if (min_timeout > n * p)
1161                                 chunk = (chunk * n * p) / min_timeout;
1162                         if (chunk == 0)
1163                                 chunk = 1;
1164
1165                         for (i = 0; i < chunk; i++) {
1166                                 kranal_check_conns(conn_index, 
1167                                                    &next_min_timeout);
1168                                 conn_index = (conn_index + 1) % conn_entries;
1169                         }
1170
1171                         next_check_time += p * HZ;
1172
1173                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1174
1175                         if (((conn_index - chunk <= base_index &&
1176                               base_index < conn_index) ||
1177                              (conn_index - conn_entries - chunk <= base_index &&
1178                               base_index < conn_index - conn_entries))) {
1179
1180                                 /* Scanned all conns: set current_min_timeout... */
1181                                 if (current_min_timeout != next_min_timeout) {
1182                                         current_min_timeout = next_min_timeout;                                        
1183                                         CWARN("Set new min timeout %ld\n",
1184                                               current_min_timeout);
1185                                 }
1186
1187                                 /* ...and restart min timeout scan */
1188                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1189                                 base_index = conn_index - 1;
1190                                 if (base_index < 0)
1191                                         base_index = conn_entries - 1;
1192                         }
1193                 }
1194
1195                 set_current_state(TASK_INTERRUPTIBLE);
1196                 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1197
1198                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1199
1200                 busy_loops = 0;
1201                 schedule_timeout(timeout);
1202
1203                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1204
1205                 set_current_state(TASK_RUNNING);
1206                 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1207         }
1208
1209         kranal_thread_fini();
1210         return 0;
1211 }
1212
1213 void
1214 kranal_process_rdmaq (__u32 cqid)
1215 {
1216         kra_conn_t          *conn;
1217         kra_tx_t            *tx;
1218         RAP_RETURN           rrc;
1219         unsigned long        flags;
1220         RAP_RDMA_DESCRIPTOR *desc;
1221         
1222         read_lock(&kranal_data.kra_global_lock);
1223
1224         conn = kranal_cqid2conn_locked(cqid);
1225         LASSERT (conn != NULL);
1226
1227         rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1228         LASSERT (rrc == RAP_SUCCESS);
1229
1230         spin_lock_irqsave(&conn->rac_lock, flags);
1231
1232         LASSERT (!list_empty(&conn->rac_rdmaq));
1233         tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list);
1234         list_del(&tx->tx_list);
1235
1236         LASSERT(desc->AppPtr == (void *)tx);
1237         LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1238                 desc->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1239
1240         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1241         tx->tx_qtime = jiffies;
1242         
1243         spin_unlock_irqrestore(&conn->rac_lock, flags);
1244
1245         /* Get conn's fmaq processed, now I've just put something there */
1246         kranal_schedule_conn(conn);
1247
1248         read_unlock(&kranal_data.kra_global_lock);
1249 }
1250
1251 int
1252 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1253                void *immediate, int immediatenob)
1254 {
1255         int   sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1256
1257         LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
1258         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1259                  immediatenob <= RANAL_FMA_MAX_DATA_LEN :
1260                  immediatenob == 0);
1261
1262         msg->ram_incarnation = conn->rac_incarnation;
1263         msg->ram_seq = conn->rac_tx_seq;
1264
1265         if (sync)
1266                 rrc = RapkFmaSyncSend(conn->rac_device.rad_handle,
1267                                       immediate, immediatenob,
1268                                       msg, sizeof(*msg));
1269         else
1270                 rrc = RapkFmaSend(conn->rac_device.rad_handle,
1271                                   immediate, immediatenob,
1272                                   msg, sizeof(*msg));
1273
1274         switch (rrc) {
1275         case RAP_SUCCESS:
1276                 conn->rac_last_tx = jiffies;
1277                 conn->rac_tx_seq++;
1278                 return 0;
1279                 
1280         case RAP_NOT_DONE:
1281                 return -EAGAIN;
1282
1283         default:
1284                 LBUG();
1285         }
1286 }
1287
1288 int
1289 kranal_process_fmaq (kra_conn_t *conn) 
1290 {
1291         unsigned long flags;
1292         int           more_to_do;
1293         kra_tx_t     *tx;
1294         int           rc;
1295         int           expect_reply;
1296
1297         /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
1298          * out of credits when I try to send right now... */
1299
1300         if (conn->rac_closing) {
1301
1302                 if (!list_empty(&conn->rac_rdmaq)) {
1303                         /* Can't send CLOSE yet; I'm still waiting for RDMAs I
1304                          * posted to finish */
1305                         LASSERT (!conn->rac_close_sent);
1306                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1307                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1308                         return 0;
1309                 }
1310
1311                 if (conn->rac_close_sent)
1312                         return 0;
1313                 
1314                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1315                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1316                 conn->rac_close_sent = (rc == 0);
1317                 return 0;
1318         }
1319
1320         spin_lock_irqsave(&conn->rac_lock, flags);
1321
1322         if (list_empty(&conn->rac_fmaq)) {
1323
1324                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1325
1326                 if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) {
1327                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1328                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1329                 }
1330                 return 0;
1331         }
1332         
1333         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1334         list_del(&tx->tx_list);
1335         more_to_do = !list_empty(&conn->rac_fmaq);
1336
1337         spin_unlock_irqrestore(&conn->rac_lock, flags);
1338
1339         expect_reply = 0;
1340         switch (tx->tx_msg.ram_type) {
1341         default:
1342                 LBUG();
1343                 
1344         case RANAL_MSG_IMMEDIATE:
1345         case RANAL_MSG_PUT_NAK:
1346         case RANAL_MSG_PUT_DONE:
1347         case RANAL_MSG_GET_NAK:
1348         case RANAL_MSG_GET_DONE:
1349                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1350                                     tx->tx_buffer, tx->tx_nob);
1351                 expect_reply = 0;
1352                 break;
1353                 
1354         case RANAL_MSG_PUT_REQ:
1355                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1356                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1357                 kranal_map_buffer(tx);
1358                 expect_reply = 1;
1359                 break;
1360
1361         case RANAL_MSG_PUT_ACK:
1362                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1363                 expect_reply = 1;
1364                 break;
1365
1366         case RANAL_MSG_GET_REQ:
1367                 kranal_map_buffer(tx);
1368                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1369                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1370                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer;
1371                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1372                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1373                 expect_reply = 1;
1374                 break;
1375         }
1376
1377         if (rc == -EAGAIN) {
1378                 /* replace at the head of the list for later */
1379                 spin_lock_irqsave(&conn->rac_lock, flags);
1380                 list_add(&tx->tx_list, &conn->rac_fmaq);
1381                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1382
1383                 return 0;
1384         }
1385
1386         LASSERT (rc == 0);
1387         
1388         if (!expect_reply) {
1389                 kranal_tx_done(tx, 0);
1390         } else {
1391                 spin_lock_irqsave(&conn->rac_lock, flags);
1392                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1393                 tx->tx_qtime = jiffies;
1394                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1395         }
1396
1397         return more_to_do;
1398 }
1399
1400 static inline void
1401 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1402 {
1403         __swab64s(&d->rard_key.Key);
1404         __swab16s(&d->rard_key.Cookie);
1405         __swab16s(&d->rard_key.MdHandle);
1406         __swab32s(&d->rard_key.Flags);
1407         __swab64s(&d->rard_addr);
1408         __swab32s(&d->rard_nob);
1409 }
1410
1411 kra_tx_t *
1412 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1413 {
1414         unsigned long     flags;
1415         struct list_head *ttmp;
1416         kra_tx_t         *tx;
1417         
1418         list_for_each(ttmp, &conn->rac_replyq) {
1419                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1420                 
1421                 if (tx->tx_cookie != cookie)
1422                         continue;
1423                 
1424                 if (tx->tx_msg.ram_type != type) {
1425                         CWARN("Unexpected type %x (%x expected) "
1426                               "matched reply from "LPX64"\n",
1427                               tx->tx_msg.ram_type, type,
1428                               conn->rac_peer->rap_nid);
1429                         return NULL;
1430                 }
1431         }
1432         
1433         CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1434         return NULL;
1435 }
1436
1437 int
1438 kranal_process_receives(kra_conn_t *conn)
1439 {
1440         unsigned long flags;
1441         __u32         seq;
1442         __u32         nob;
1443         kra_msg_t    *msg;
1444         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg);
1445         kra_peer_t   *peer = conn->rac_peer;
1446
1447         if (rrc == RAP_NOT_DONE)
1448                 return 0;
1449         
1450         LASSERT (rrc == RAP_SUCCESS);
1451         conn->rac_last_rx = jiffies;
1452         seq = conn->rac_seq++;
1453
1454         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1455                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1456                         CERROR("Unexpected magic %08x from "LPX64"\n",
1457                                msg->ram_magic, peer->rap_nid);
1458                         goto out;
1459                 }
1460
1461                 __swab32s(&msg->ram_magic);
1462                 __swab16s(&msg->ram_version);
1463                 __swab16s(&msg->ram_type);
1464                 __swab64s(&msg->ram_srcnid);
1465                 __swab64s(&msg->ram_incarnation);
1466                 __swab32s(&msg->ram_seq);
1467
1468                 /* NB message type checked below; NOT here... */
1469                 switch (msg->ram_type) {
1470                 case RANAL_MSG_PUT_ACK:
1471                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1472                         break;
1473
1474                 case RANAL_MSG_GET_REQ:
1475                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1476                         break;
1477                         
1478                 default:
1479                         break;
1480                 }
1481         }
1482
1483         if (msg->ram_version != RANAL_MSG_VERSION) {
1484                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1485                        msg->ram_version, peer->rap_nid);
1486                 goto out;
1487         }
1488
1489         if (msg->ram_srcnid != peer->rap_nid) {
1490                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1491                        msg->ram_srcnid, peer->rap_nid);
1492                 goto out;
1493         }
1494         
1495         if (msg->ram_incarnation != conn->rac_incarnation) {
1496                 CERROR("Unexpected incarnation "LPX64"("LPX64
1497                        " expected) from "LPX64"\n",
1498                        msg->ram_incarnation, conn->rac_incarnation,
1499                        peer->rap_nid);
1500                 goto out;
1501         }
1502         
1503         if (msg->ram_seq != seq) {
1504                 CERROR("Unexpected sequence number %d(%d expected) from "
1505                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1506                 goto out;
1507         }
1508
1509         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1510                 /* This message signals RDMA completion: wait now... */
1511                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1512                 LASSERT (rrc == RAP_SUCCESS);
1513         }
1514  
1515         if (msg->ram_type == RANAL_MSG_CLOSE) {
1516                 conn->rac_close_recvd = 1;
1517                 write_lock_irqsave(&kranal_data.kra_global_lock);
1518
1519                 if (!conn->rac_closing)
1520                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1521                 else if (conn->rac_close_sent)
1522                         kranal_terminate_conn_locked(conn);
1523                 
1524                 goto out;
1525         }
1526
1527         if (conn->rac_closing)
1528                 goto out;
1529         
1530         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
1531                                                 /* they'll NULL rac_rxmsg if they consume it */
1532         switch (msg->ram_type) {
1533         case RANAL_MSG_NOOP:
1534                 /* Nothing to do; just a keepalive */
1535                 break;
1536                 
1537         case RANAL_MSG_IMMEDIATE:
1538                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1539                 break;
1540                 
1541         case RANAL_MSG_PUT_REQ:
1542                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1543
1544                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1545                         break;
1546
1547                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1548                 if (tx == NULL)
1549                         break;
1550                 
1551                 tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie;
1552                 kranal_post_fma(conn, tx);
1553                 break;
1554
1555         case RANAL_MSG_PUT_NAK:
1556                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1557                                         msg->ram_u.completion.racm_cookie);
1558                 if (tx == NULL)
1559                         break;
1560                 
1561                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1562                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1563                 kranal_tx_done(tx, -ENOENT);    /* no match */
1564                 break;
1565                 
1566         case RANAL_MSG_PUT_ACK:
1567                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1568                                         msg->ram_u.putack.rapam_src_cookie);
1569                 if (tx == NULL)
1570                         break;
1571
1572                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1573                             &msg->ram_u.putack.rapam_desc, 
1574                             msg->msg_u.putack.rapam_desc.rard_nob,
1575                             msg->ram_u.putack.rapam_dst_cookie);
1576                 break;
1577
1578         case RANAL_MSG_PUT_DONE:
1579                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1580                                         msg->ram_u.completion.racm_cookie);
1581                 if (tx == NULL)
1582                         break;
1583
1584                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1585                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1586                 kranal_tx_done(tx, 0);
1587                 break;
1588
1589         case RANAL_MSG_GET_REQ:
1590                 lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn);
1591                 
1592                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1593                         break;
1594
1595                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1596                 if (tx == NULL)
1597                         break;
1598
1599                 tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie;
1600                 kranal_post_fma(conn, tx);
1601                 break;
1602                 
1603         case RANAL_MSG_GET_NAK:
1604                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1605                                         msg->ram_u.completion.racm_cookie);
1606                 if (tx == NULL)
1607                         break;
1608                 
1609                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1610                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1611                 kranal_tx_done(tx, -ENOENT);    /* no match */
1612                 break;
1613                 
1614         case RANAL_MSG_GET_DONE:
1615                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1616                                         msg->ram_u.completion.racm_cookie);
1617                 if (tx == NULL)
1618                         break;
1619                 
1620                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1621                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1622                 kranal_tx_done(tx, 0);
1623                 break;
1624         }
1625
1626  out:
1627         if (conn->rac_msg != NULL)
1628                 kranal_consume_rxmsg(conn, NULL, 0);
1629
1630         return 1;
1631 }
1632
1633 int
1634 kranal_scheduler (void *arg)
1635 {
1636         kra_device_t   *dev = (kra_device_t *)arg;
1637         wait_queue_t    wait;
1638         char            name[16];
1639         kra_conn_t     *conn;
1640         unsigned long   flags;
1641         int             rc;
1642         int             i;
1643         __u32           cqid;
1644         int             did_something;
1645         int             busy_loops = 0;
1646
1647         snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx);
1648         kportal_daemonize(name);
1649         kportal_blockallsigs();
1650
1651         init_waitqueue_entry(&wait, current);
1652
1653         spin_lock_irqsave(&dev->rad_lock, flags);
1654
1655         while (!kranal_data.kra_shutdown) {
1656                 /* Safe: kra_shutdown only set when quiescent */
1657                 
1658                 if (busy_loops++ >= RANAL_RESCHED) {
1659                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1660
1661                         our_cond_resched();
1662                         busy_loops = 0;
1663
1664                         spin_lock_irqsave(&dev->rad_lock, flags);
1665                 }
1666
1667                 did_something = 0;
1668
1669                 if (dev->rad_ready) {
1670                         dev->rad_ready = 0;
1671                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1672
1673                         rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
1674
1675                         LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1676                         LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1677                         
1678                         if (rrc == RAP_SUCCESS) {
1679                                 kranal_process_rdmaq(cqid);
1680                                 did_something = 1;
1681                         }
1682                         
1683                         rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
1684                         LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1685                         
1686                         if (rrc == RAP_SUCCESS) {
1687                                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
1688                                         kranal_schedule_dev(dev);
1689                                 else
1690                                         kranal_schedule_cqid(cqid);
1691                                 did_something = 1;
1692                         }
1693                         
1694                         spin_lock_irqsave(&dev->rad_lock, flags);
1695
1696                         /* If there were no completions to handle, I leave
1697                          * rad_ready clear.  NB I cleared it BEFORE I checked
1698                          * the completion queues since I'm racing with the
1699                          * device callback. */
1700
1701                         if (did_something)
1702                                 dev->rad_ready = 1;
1703                 }
1704                 
1705                 if (!list_empty(&dev->rad_connq)) {
1706                         conn = list_entry(dev->rad_connq.next,
1707                                           kra_conn_t, rac_schedlist);
1708                         list_del(&conn->rac_schedlist);
1709                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1710
1711                         LASSERT (conn->rac_scheduled);
1712
1713                         resched  = kranal_process_fmaq(conn);
1714                         resched |= kranal_process_receives(conn);
1715                         did_something = 1;
1716
1717                         spin_lock_irqsave(&dev->rad_lock, flags);
1718                         if (resched)
1719                                 list_add_tail(&conn->rac_schedlist,
1720                                               &dev->rad_connq);
1721                 }
1722
1723                 if (did_something)
1724                         continue;
1725
1726                 add_wait_queue(&dev->rad_waitq, &wait);
1727                 set_current_state(TASK_INTERRUPTIBLE);
1728
1729                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1730
1731                 busy_loops = 0;
1732                 schedule();
1733
1734                 set_current_state(TASK_RUNNING);
1735                 remove_wait_queue(&dev->rad_waitq, &wait);
1736
1737                 spin_lock_irqsave(&dev->rad_lock, flags);
1738         }
1739
1740         spin_unlock_irqrestore(&dev->rad_lock, flags);
1741
1742         kranal_thread_fini();
1743         return 0;
1744 }
1745
1746
1747 lib_nal_t kranal_lib = {
1748         libnal_data:        &kranal_data,      /* NAL private data */
1749         libnal_send:         kranal_send,
1750         libnal_send_pages:   kranal_send_pages,
1751         libnal_recv:         kranal_recv,
1752         libnal_recv_pages:   kranal_recv_pages,
1753         libnal_dist:         kranal_dist
1754 };