Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openibnal.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 void
31 koibnal_schedule_tx_done (koib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&koibnal_data.koib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &koibnal_data.koib_sched_txq);
38         wake_up (&koibnal_data.koib_sched_waitq);
39
40         spin_unlock_irqrestore(&koibnal_data.koib_sched_lock, flags);
41 }
42
43 void
44 koibnal_tx_done (koib_tx_t *tx)
45 {
46         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
47         unsigned long    flags;
48         int              i;
49         int              rc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be on ibc_rdma_queue */
53
54         switch (tx->tx_mapped) {
55         default:
56                 LBUG();
57
58         case KOIB_TX_UNMAPPED:
59                 break;
60                 
61         case KOIB_TX_MAPPED:
62                 if (in_interrupt()) {
63                         /* can't deregister memory in IRQ context... */
64                         koibnal_schedule_tx_done(tx);
65                         return;
66                 }
67                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
68                 LASSERT (rc == 0);
69                 tx->tx_mapped = KOIB_TX_UNMAPPED;
70                 break;
71
72 #if OPENIBNAL_FMR
73         case KOIB_TX_MAPPED_FMR:
74                 if (in_interrupt() && tx->tx_status != 0) {
75                         /* can't flush FMRs in IRQ context... */
76                         koibnal_schedule_tx_done(tx);
77                         return;
78                 }              
79
80                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
81                 LASSERT (rc == 0);
82
83                 if (tx->tx_status != 0)
84                         ib_fmr_pool_force_flush(koibnal_data.koib_fmr_pool);
85                 tx->tx_mapped = KOIB_TX_UNMAPPED;
86                 break;
87 #endif
88         }
89
90         for (i = 0; i < 2; i++) {
91                 /* tx may have up to 2 libmsgs to finalise */
92                 if (tx->tx_libmsg[i] == NULL)
93                         continue;
94
95                 lib_finalize (&koibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96                 tx->tx_libmsg[i] = NULL;
97         }
98         
99         if (tx->tx_conn != NULL) {
100                 koibnal_put_conn (tx->tx_conn);
101                 tx->tx_conn = NULL;
102         }
103
104         tx->tx_nsp = 0;
105         tx->tx_passive_rdma = 0;
106         tx->tx_status = 0;
107
108         spin_lock_irqsave (&koibnal_data.koib_tx_lock, flags);
109
110         if (tx->tx_isnblk) {
111                 list_add_tail (&tx->tx_list, &koibnal_data.koib_idle_nblk_txs);
112         } else {
113                 list_add_tail (&tx->tx_list, &koibnal_data.koib_idle_txs);
114                 wake_up (&koibnal_data.koib_idle_tx_waitq);
115         }
116
117         spin_unlock_irqrestore (&koibnal_data.koib_tx_lock, flags);
118 }
119
120 koib_tx_t *
121 koibnal_get_idle_tx (int may_block) 
122 {
123         unsigned long    flags;
124         koib_tx_t    *tx = NULL;
125         
126         for (;;) {
127                 spin_lock_irqsave (&koibnal_data.koib_tx_lock, flags);
128
129                 /* "normal" descriptor is free */
130                 if (!list_empty (&koibnal_data.koib_idle_txs)) {
131                         tx = list_entry (koibnal_data.koib_idle_txs.next,
132                                          koib_tx_t, tx_list);
133                         break;
134                 }
135
136                 if (!may_block) {
137                         /* may dip into reserve pool */
138                         if (list_empty (&koibnal_data.koib_idle_nblk_txs)) {
139                                 CERROR ("reserved tx desc pool exhausted\n");
140                                 break;
141                         }
142
143                         tx = list_entry (koibnal_data.koib_idle_nblk_txs.next,
144                                          koib_tx_t, tx_list);
145                         break;
146                 }
147
148                 /* block for idle tx */
149                 spin_unlock_irqrestore (&koibnal_data.koib_tx_lock, flags);
150
151                 wait_event (koibnal_data.koib_idle_tx_waitq,
152                             !list_empty (&koibnal_data.koib_idle_txs) ||
153                             koibnal_data.koib_shutdown);
154         }
155
156         if (tx != NULL) {
157                 list_del (&tx->tx_list);
158
159                 /* Allocate a new passive RDMA completion cookie.  It might
160                  * not be needed, but we've got a lock right now and we're
161                  * unlikely to wrap... */
162                 tx->tx_passive_rdma_cookie = koibnal_data.koib_next_tx_cookie++;
163
164                 LASSERT (tx->tx_mapped == KOIB_TX_UNMAPPED);
165                 LASSERT (tx->tx_nsp == 0);
166                 LASSERT (tx->tx_sending == 0);
167                 LASSERT (tx->tx_status == 0);
168                 LASSERT (tx->tx_conn == NULL);
169                 LASSERT (!tx->tx_passive_rdma);
170                 LASSERT (!tx->tx_passive_rdma_wait);
171                 LASSERT (tx->tx_libmsg[0] == NULL);
172                 LASSERT (tx->tx_libmsg[1] == NULL);
173         }
174
175         spin_unlock_irqrestore (&koibnal_data.koib_tx_lock, flags);
176         
177         return (tx);
178 }
179
180 int
181 koibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
182 {
183         /* I would guess that if koibnal_get_peer (nid) == NULL,
184            and we're not routing, then 'nid' is very distant :) */
185         if ( nal->libnal_ni.ni_pid.nid == nid ) {
186                 *dist = 0;
187         } else {
188                 *dist = 1;
189         }
190
191         return 0;
192 }
193
194 void
195 koibnal_complete_passive_rdma(koib_conn_t *conn, __u64 cookie, int status)
196 {
197         struct list_head *ttmp;
198         unsigned long     flags;
199         int               idle;
200
201         spin_lock_irqsave (&conn->ibc_lock, flags);
202
203         list_for_each (ttmp, &conn->ibc_rdma_queue) {
204                 koib_tx_t *tx = list_entry(ttmp, koib_tx_t, tx_list);
205                 
206                 LASSERT (tx->tx_passive_rdma);
207                 LASSERT (tx->tx_passive_rdma_wait);
208
209                 if (tx->tx_passive_rdma_cookie != cookie)
210                         continue;
211
212                 CDEBUG(D_NET, "Complete %p "LPD64"\n", tx, cookie);
213
214                 list_del (&tx->tx_list);
215
216                 tx->tx_passive_rdma_wait = 0;
217                 idle = (tx->tx_sending == 0);
218
219                 tx->tx_status = status;
220
221                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
222
223                 /* I could be racing with tx callbacks.  It's whoever
224                  * _makes_ tx idle that frees it */
225                 if (idle)
226                         koibnal_tx_done (tx);
227                 return;
228         }
229                 
230         spin_unlock_irqrestore (&conn->ibc_lock, flags);
231
232         CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
233                 cookie, conn->ibc_peer->ibp_nid);
234 }
235
236 void
237 koibnal_post_rx (koib_rx_t *rx, int do_credits)
238 {
239         koib_conn_t  *conn = rx->rx_conn;
240         int           rc;
241         unsigned long flags;
242
243         rx->rx_gl = (struct ib_gather_scatter) {
244                 .address = rx->rx_vaddr,
245                 .length  = OPENIBNAL_MSG_SIZE,
246                 .key     = conn->ibc_rx_pages->oibp_lkey,
247         };
248         
249         rx->rx_sp = (struct ib_receive_param) {
250                 .work_request_id        = (__u64)(unsigned long)rx,
251                 .scatter_list           = &rx->rx_gl,
252                 .num_scatter_entries    = 1,
253                 .device_specific        = NULL,
254                 .signaled               = 1,
255         };
256
257         LASSERT (conn->ibc_state >= OPENIBNAL_CONN_ESTABLISHED);
258         LASSERT (!rx->rx_posted);
259         rx->rx_posted = 1;
260         mb();
261
262         if (conn->ibc_state != OPENIBNAL_CONN_ESTABLISHED)
263                 rc = -ECONNABORTED;
264         else
265                 rc = ib_receive (conn->ibc_qp, &rx->rx_sp, 1);
266
267         if (rc == 0) {
268                 if (do_credits) {
269                         spin_lock_irqsave(&conn->ibc_lock, flags);
270                         conn->ibc_outstanding_credits++;
271                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
272
273                         koibnal_check_sends(conn);
274                 }
275                 return;
276         }
277
278         if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED) {
279                 CERROR ("Error posting receive -> "LPX64": %d\n",
280                         conn->ibc_peer->ibp_nid, rc);
281                 koibnal_close_conn (rx->rx_conn, rc);
282         } else {
283                 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
284                         conn->ibc_peer->ibp_nid, rc);
285         }
286
287         /* Drop rx's ref */
288         koibnal_put_conn (conn);
289 }
290
291 #if OPENIBNAL_CKSUM
292 __u32 koibnal_cksum (void *ptr, int nob)
293 {
294         char  *c  = ptr;
295         __u32  sum = 0;
296
297         while (nob-- > 0)
298                 sum = ((sum << 1) | (sum >> 31)) + *c++;
299         
300         return (sum);
301 }
302 #endif
303
304 void
305 koibnal_rx_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
306 {
307         koib_rx_t    *rx = (koib_rx_t *)((unsigned long)e->work_request_id);
308         koib_msg_t   *msg = rx->rx_msg;
309         koib_conn_t  *conn = rx->rx_conn;
310         int           nob = e->bytes_transferred;
311         const int     base_nob = offsetof(koib_msg_t, oibm_u);
312         int           credits;
313         int           flipped;
314         unsigned long flags;
315 #if OPENIBNAL_CKSUM
316         __u32         msg_cksum;
317         __u32         computed_cksum;
318 #endif
319
320         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
321         LASSERT (rx->rx_posted);
322         rx->rx_posted = 0;
323         mb();
324
325         /* receives complete with error in any case after we've started
326          * closing the QP */
327         if (conn->ibc_state >= OPENIBNAL_CONN_DEATHROW)
328                 goto failed;
329
330         /* We don't post receives until the conn is established */
331         LASSERT (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED);
332
333         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
334                 CERROR("Rx from "LPX64" failed: %d\n", 
335                        conn->ibc_peer->ibp_nid, e->status);
336                 goto failed;
337         }
338
339         if (nob < base_nob) {
340                 CERROR ("Short rx from "LPX64": %d\n",
341                         conn->ibc_peer->ibp_nid, nob);
342                 goto failed;
343         }
344
345         /* Receiver does any byte flipping if necessary... */
346
347         if (msg->oibm_magic == OPENIBNAL_MSG_MAGIC) {
348                 flipped = 0;
349         } else {
350                 if (msg->oibm_magic != __swab32(OPENIBNAL_MSG_MAGIC)) {
351                         CERROR ("Unrecognised magic: %08x from "LPX64"\n", 
352                                 msg->oibm_magic, conn->ibc_peer->ibp_nid);
353                         goto failed;
354                 }
355                 flipped = 1;
356                 __swab16s (&msg->oibm_version);
357                 LASSERT (sizeof(msg->oibm_type) == 1);
358                 LASSERT (sizeof(msg->oibm_credits) == 1);
359         }
360
361         if (msg->oibm_version != OPENIBNAL_MSG_VERSION) {
362                 CERROR ("Incompatible msg version %d (%d expected)\n",
363                         msg->oibm_version, OPENIBNAL_MSG_VERSION);
364                 goto failed;
365         }
366
367 #if OPENIBNAL_CKSUM
368         if (nob != msg->oibm_nob) {
369                 CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->oibm_nob);
370                 goto failed;
371         }
372
373         msg_cksum = le32_to_cpu(msg->oibm_cksum);
374         msg->oibm_cksum = 0;
375         computed_cksum = koibnal_cksum (msg, nob);
376         
377         if (msg_cksum != computed_cksum) {
378                 CERROR ("Checksum failure %d: (%d expected)\n",
379                         computed_cksum, msg_cksum);
380                 goto failed;
381         }
382         CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob);
383 #endif
384
385         /* Have I received credits that will let me send? */
386         credits = msg->oibm_credits;
387         if (credits != 0) {
388                 spin_lock_irqsave(&conn->ibc_lock, flags);
389                 conn->ibc_credits += credits;
390                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
391                 
392                 koibnal_check_sends(conn);
393         }
394
395         switch (msg->oibm_type) {
396         case OPENIBNAL_MSG_NOOP:
397                 koibnal_post_rx (rx, 1);
398                 return;
399
400         case OPENIBNAL_MSG_IMMEDIATE:
401                 if (nob < base_nob + sizeof (koib_immediate_msg_t)) {
402                         CERROR ("Short IMMEDIATE from "LPX64": %d\n",
403                                 conn->ibc_peer->ibp_nid, nob);
404                         goto failed;
405                 }
406                 break;
407                 
408         case OPENIBNAL_MSG_PUT_RDMA:
409         case OPENIBNAL_MSG_GET_RDMA:
410                 if (nob < base_nob + sizeof (koib_rdma_msg_t)) {
411                         CERROR ("Short RDMA msg from "LPX64": %d\n",
412                                 conn->ibc_peer->ibp_nid, nob);
413                         goto failed;
414                 }
415                 if (flipped) {
416                         __swab32s(&msg->oibm_u.rdma.oibrm_desc.rd_key);
417                         __swab32s(&msg->oibm_u.rdma.oibrm_desc.rd_nob);
418                         __swab64s(&msg->oibm_u.rdma.oibrm_desc.rd_addr);
419                 }
420                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
421                        msg->oibm_type, msg->oibm_u.rdma.oibrm_cookie,
422                        msg->oibm_u.rdma.oibrm_desc.rd_key,
423                        msg->oibm_u.rdma.oibrm_desc.rd_addr,
424                        msg->oibm_u.rdma.oibrm_desc.rd_nob);
425                 break;
426                 
427         case OPENIBNAL_MSG_PUT_DONE:
428         case OPENIBNAL_MSG_GET_DONE:
429                 if (nob < base_nob + sizeof (koib_completion_msg_t)) {
430                         CERROR ("Short COMPLETION msg from "LPX64": %d\n",
431                                 conn->ibc_peer->ibp_nid, nob);
432                         goto failed;
433                 }
434                 if (flipped)
435                         __swab32s(&msg->oibm_u.completion.oibcm_status);
436                 
437                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
438                        msg->oibm_type, msg->oibm_u.completion.oibcm_cookie,
439                        msg->oibm_u.completion.oibcm_status);
440
441                 koibnal_complete_passive_rdma (conn, 
442                                                msg->oibm_u.completion.oibcm_cookie,
443                                                msg->oibm_u.completion.oibcm_status);
444                 koibnal_post_rx (rx, 1);
445                 return;
446                         
447         default:
448                 CERROR ("Can't parse type from "LPX64": %d\n",
449                         conn->ibc_peer->ibp_nid, msg->oibm_type);
450                 goto failed;
451         }
452
453         /* schedule for koibnal_rx() in thread context */
454         spin_lock_irqsave(&koibnal_data.koib_sched_lock, flags);
455         
456         list_add_tail (&rx->rx_list, &koibnal_data.koib_sched_rxq);
457         wake_up (&koibnal_data.koib_sched_waitq);
458         
459         spin_unlock_irqrestore(&koibnal_data.koib_sched_lock, flags);
460         return;
461         
462  failed:
463         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
464         koibnal_close_conn(conn, -ECONNABORTED);
465
466         /* Don't re-post rx & drop its ref on conn */
467         koibnal_put_conn(conn);
468 }
469
470 void
471 koibnal_rx (koib_rx_t *rx)
472 {
473         koib_msg_t   *msg = rx->rx_msg;
474
475         /* Clear flag so I can detect if I've sent an RDMA completion */
476         rx->rx_rdma = 0;
477
478         switch (msg->oibm_type) {
479         case OPENIBNAL_MSG_GET_RDMA:
480                 lib_parse(&koibnal_lib, &msg->oibm_u.rdma.oibrm_hdr, rx);
481                 /* If the incoming get was matched, I'll have initiated the
482                  * RDMA and the completion message... */
483                 if (rx->rx_rdma)
484                         break;
485
486                 /* Otherwise, I'll send a failed completion now to prevent
487                  * the peer's GET blocking for the full timeout. */
488                 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
489                         rx->rx_conn->ibc_peer->ibp_nid);
490                 koibnal_start_active_rdma (OPENIBNAL_MSG_GET_DONE, -EIO,
491                                            rx, NULL, 0, NULL, NULL, 0, 0);
492                 break;
493                 
494         case OPENIBNAL_MSG_PUT_RDMA:
495                 lib_parse(&koibnal_lib, &msg->oibm_u.rdma.oibrm_hdr, rx);
496                 if (rx->rx_rdma)
497                         break;
498                 /* This is most unusual, since even if lib_parse() didn't
499                  * match anything, it should have asked us to read (and
500                  * discard) the payload.  The portals header must be
501                  * inconsistent with this message type, so it's the
502                  * sender's fault for sending garbage and she can time
503                  * herself out... */
504                 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
505                         rx->rx_conn->ibc_peer->ibp_nid);
506                 break;
507
508         case OPENIBNAL_MSG_IMMEDIATE:
509                 lib_parse(&koibnal_lib, &msg->oibm_u.immediate.oibim_hdr, rx);
510                 LASSERT (!rx->rx_rdma);
511                 break;
512                 
513         default:
514                 LBUG();
515                 break;
516         }
517
518         koibnal_post_rx (rx, 1);
519 }
520
521 #if 0
522 int
523 koibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
524 {
525         struct page *page;
526
527         if (vaddr >= VMALLOC_START &&
528             vaddr < VMALLOC_END)
529                 page = vmalloc_to_page ((void *)vaddr);
530 #if CONFIG_HIGHMEM
531         else if (vaddr >= PKMAP_BASE &&
532                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
533                 page = vmalloc_to_page ((void *)vaddr);
534                 /* in 2.4 ^ just walks the page tables */
535 #endif
536         else
537                 page = virt_to_page (vaddr);
538
539         if (page == NULL ||
540             !VALID_PAGE (page))
541                 return (-EFAULT);
542
543         *physp = koibnal_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
544         return (0);
545 }
546 #endif
547
548 int
549 koibnal_map_iov (koib_tx_t *tx, enum ib_memory_access access,
550                  int niov, struct iovec *iov, int offset, int nob)
551                  
552 {
553         void   *vaddr;
554         int     rc;
555
556         LASSERT (nob > 0);
557         LASSERT (niov > 0);
558         LASSERT (tx->tx_mapped == KOIB_TX_UNMAPPED);
559
560         while (offset >= iov->iov_len) {
561                 offset -= iov->iov_len;
562                 niov--;
563                 iov++;
564                 LASSERT (niov > 0);
565         }
566
567         if (nob > iov->iov_len - offset) {
568                 CERROR ("Can't map multiple vaddr fragments\n");
569                 return (-EMSGSIZE);
570         }
571
572         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
573         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
574
575         rc = ib_memory_register (koibnal_data.koib_pd,
576                                  vaddr, nob,
577                                  access,
578                                  &tx->tx_md.md_handle.mr,
579                                  &tx->tx_md.md_lkey,
580                                  &tx->tx_md.md_rkey);
581         
582         if (rc != 0) {
583                 CERROR ("Can't map vaddr: %d\n", rc);
584                 return (rc);
585         }
586
587         tx->tx_mapped = KOIB_TX_MAPPED;
588         return (0);
589 }
590
591 int
592 koibnal_map_kiov (koib_tx_t *tx, enum ib_memory_access access,
593                   int nkiov, ptl_kiov_t *kiov,
594                   int offset, int nob)
595 {
596 #if OPENIBNAL_FMR
597         __u64                      *phys;
598         const int                   mapped = KOIB_TX_MAPPED_FMR;
599 #else
600         struct ib_physical_buffer  *phys;
601         const int                   mapped = KOIB_TX_MAPPED;
602 #endif
603         int                         page_offset;
604         int                         nphys;
605         int                         resid;
606         int                         phys_size;
607         int                         rc;
608
609         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
610
611         LASSERT (nob > 0);
612         LASSERT (nkiov > 0);
613         LASSERT (tx->tx_mapped == KOIB_TX_UNMAPPED);
614
615         while (offset >= kiov->kiov_len) {
616                 offset -= kiov->kiov_len;
617                 nkiov--;
618                 kiov++;
619                 LASSERT (nkiov > 0);
620         }
621
622         phys_size = nkiov * sizeof (*phys);
623         PORTAL_ALLOC(phys, phys_size);
624         if (phys == NULL) {
625                 CERROR ("Can't allocate tmp phys\n");
626                 return (-ENOMEM);
627         }
628
629         page_offset = kiov->kiov_offset + offset;
630 #if OPENIBNAL_FMR
631         phys[0] = koibnal_page2phys(kiov->kiov_page);
632 #else
633         phys[0].address = koibnal_page2phys(kiov->kiov_page);
634         phys[0].size = PAGE_SIZE;
635 #endif
636         nphys = 1;
637         resid = nob - (kiov->kiov_len - offset);
638
639         while (resid > 0) {
640                 kiov++;
641                 nkiov--;
642                 LASSERT (nkiov > 0);
643
644                 if (kiov->kiov_offset != 0 ||
645                     ((resid > PAGE_SIZE) && 
646                      kiov->kiov_len < PAGE_SIZE)) {
647                         int i;
648                         /* Can't have gaps */
649                         CERROR ("Can't make payload contiguous in I/O VM:"
650                                 "page %d, offset %d, len %d \n", nphys, 
651                                 kiov->kiov_offset, kiov->kiov_len);
652
653                         for (i = -nphys; i < nkiov; i++) 
654                         {
655                                 CERROR("kiov[%d] %p +%d for %d\n",
656                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
657                         }
658                         
659                         rc = -EINVAL;
660                         goto out;
661                 }
662
663                 if (nphys == PTL_MD_MAX_IOV) {
664                         CERROR ("payload too big (%d)\n", nphys);
665                         rc = -EMSGSIZE;
666                         goto out;
667                 }
668
669                 LASSERT (nphys * sizeof (*phys) < phys_size);
670 #if OPENIBNAL_FMR
671                 phys[nphys] = koibnal_page2phys(kiov->kiov_page);
672 #else
673                 phys[nphys].address = koibnal_page2phys(kiov->kiov_page);
674                 phys[nphys].size = PAGE_SIZE;
675 #endif
676                 nphys++;
677
678                 resid -= PAGE_SIZE;
679         }
680
681 #if 0
682         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
683         for (rc = 0; rc < nphys; rc++)
684                 CWARN ("   [%d] "LPX64" / %d\n", rc, phys[rc].address, phys[rc].size);
685 #endif
686         tx->tx_md.md_addr = OPENIBNAL_RDMA_BASE;
687
688 #if OPENIBNAL_FMR
689         rc = ib_fmr_register_physical (koibnal_data.koib_fmr_pool,
690                                        phys, nphys,
691                                        &tx->tx_md.md_addr,
692                                        page_offset,
693                                        &tx->tx_md.md_handle.fmr,
694                                        &tx->tx_md.md_lkey,
695                                        &tx->tx_md.md_rkey);
696 #else
697         rc = ib_memory_register_physical (koibnal_data.koib_pd,
698                                           phys, nphys,
699                                           &tx->tx_md.md_addr,
700                                           nob, page_offset,
701                                           access,
702                                           &tx->tx_md.md_handle.mr,
703                                           &tx->tx_md.md_lkey,
704                                           &tx->tx_md.md_rkey);
705 #endif
706         if (rc == 0) {
707                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
708                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
709                 tx->tx_mapped = mapped;
710         } else {
711                 CERROR ("Can't map phys: %d\n", rc);
712                 rc = -EFAULT;
713         }
714
715  out:
716         PORTAL_FREE(phys, phys_size);
717         return (rc);
718 }
719
720 koib_conn_t *
721 koibnal_find_conn_locked (koib_peer_t *peer)
722 {
723         struct list_head *tmp;
724
725         /* just return the first connection */
726         list_for_each (tmp, &peer->ibp_conns) {
727                 return (list_entry(tmp, koib_conn_t, ibc_list));
728         }
729
730         return (NULL);
731 }
732
733 void
734 koibnal_check_sends (koib_conn_t *conn)
735 {
736         unsigned long   flags;
737         koib_tx_t      *tx;
738         int             rc;
739         int             i;
740         int             done;
741         int             nwork;
742
743         spin_lock_irqsave (&conn->ibc_lock, flags);
744
745         if (list_empty(&conn->ibc_tx_queue) &&
746             conn->ibc_outstanding_credits >= OPENIBNAL_CREDIT_HIGHWATER) {
747                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
748
749                 tx = koibnal_get_idle_tx(0);     /* don't block */
750                 if (tx != NULL)
751                         koibnal_init_tx_msg(tx, OPENIBNAL_MSG_NOOP, 0);
752
753                 spin_lock_irqsave(&conn->ibc_lock, flags);
754
755                 if (tx != NULL) {
756                         atomic_inc(&conn->ibc_refcount);
757                         koibnal_queue_tx_locked(tx, conn);
758                 }
759         }
760
761         LASSERT (conn->ibc_nsends_posted <= OPENIBNAL_MSG_QUEUE_SIZE);
762
763         while (!list_empty (&conn->ibc_tx_queue)) {
764                 tx = list_entry (conn->ibc_tx_queue.next, koib_tx_t, tx_list);
765
766                 /* We rely on this for QP sizing */
767                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
768
769                 LASSERT (conn->ibc_outstanding_credits >= 0);
770                 LASSERT (conn->ibc_outstanding_credits <= OPENIBNAL_MSG_QUEUE_SIZE);
771                 LASSERT (conn->ibc_credits >= 0);
772                 LASSERT (conn->ibc_credits <= OPENIBNAL_MSG_QUEUE_SIZE);
773
774                 /* Not on ibc_rdma_queue */
775                 LASSERT (!tx->tx_passive_rdma_wait);
776
777                 if (conn->ibc_nsends_posted == OPENIBNAL_MSG_QUEUE_SIZE)
778                         break;
779
780                 if (conn->ibc_credits == 0)     /* no credits */
781                         break;
782                 
783                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
784                     conn->ibc_outstanding_credits == 0) /* giving back credits */
785                         break;
786
787                 list_del (&tx->tx_list);
788
789                 if (tx->tx_msg->oibm_type == OPENIBNAL_MSG_NOOP &&
790                     (!list_empty(&conn->ibc_tx_queue) ||
791                      conn->ibc_outstanding_credits < OPENIBNAL_CREDIT_HIGHWATER)) {
792                         /* Redundant NOOP */
793                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
794                         koibnal_tx_done(tx);
795                         spin_lock_irqsave(&conn->ibc_lock, flags);
796                         continue;
797                 }
798                 
799                 /* incoming RDMA completion can find this one now */
800                 if (tx->tx_passive_rdma) {
801                         list_add (&tx->tx_list, &conn->ibc_rdma_queue);
802                         tx->tx_passive_rdma_wait = 1;
803                         tx->tx_passive_rdma_deadline = 
804                                 jiffies + koibnal_tunables.koib_io_timeout * HZ;
805                 }
806
807                 tx->tx_msg->oibm_credits = conn->ibc_outstanding_credits;
808                 conn->ibc_outstanding_credits = 0;
809
810                 /* use the free memory barrier when we unlock to ensure
811                  * sending set before we can get the tx callback. */
812                 conn->ibc_nsends_posted++;
813                 conn->ibc_credits--;
814                 tx->tx_sending = tx->tx_nsp;
815
816 #if OPENIBNAL_CKSUM
817                 tx->tx_msg->oibm_cksum = 0;
818                 tx->tx_msg->oibm_cksum = koibnal_cksum(tx->tx_msg, tx->tx_msg->oibm_nob);
819                 CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->oibm_cksum, tx->tx_msg->oibm_nob);
820 #endif
821                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
822
823                 /* NB the gap between removing tx from the queue and sending it
824                  * allows message re-ordering to occur */
825
826                 LASSERT (tx->tx_nsp > 0);
827
828                 rc = -ECONNABORTED;
829                 nwork = 0;
830                 if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED) {
831                         tx->tx_status = 0;
832                         /* Driver only accepts 1 item at a time */
833                         for (i = 0; i < tx->tx_nsp; i++) {
834                                 rc = ib_send (conn->ibc_qp, &tx->tx_sp[i], 1);
835                                 if (rc != 0)
836                                         break;
837                                 nwork++;
838                         }
839                 }
840
841                 spin_lock_irqsave (&conn->ibc_lock, flags);
842                 if (rc != 0) {
843                         /* NB credits are transferred in the actual
844                          * message, which can only be the last work item */
845                         conn->ibc_outstanding_credits += tx->tx_msg->oibm_credits;
846                         conn->ibc_credits++;
847                         conn->ibc_nsends_posted--;
848                         tx->tx_sending -= tx->tx_nsp - nwork;
849                         tx->tx_status = rc;
850                         done = (tx->tx_sending == 0);
851                         
852                         if (tx->tx_passive_rdma) {
853                                 tx->tx_passive_rdma_wait = 0;
854                                 list_del (&tx->tx_list);
855                         }
856                         
857                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
858                         
859                         if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED)
860                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
861                                         rc, conn->ibc_peer->ibp_nid);
862                         else
863                                 CDEBUG (D_NET, "Error %d posting transmit to "
864                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
865
866                         koibnal_close_conn (conn, rc);
867
868                         if (done)
869                                 koibnal_tx_done (tx);
870                         return;
871                 }
872                 
873         }
874
875         spin_unlock_irqrestore (&conn->ibc_lock, flags);
876 }
877
878 void
879 koibnal_tx_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
880 {
881         koib_tx_t    *tx = (koib_tx_t *)((unsigned long)e->work_request_id);
882         koib_conn_t  *conn;
883         unsigned long flags;
884         int           idle;
885
886         conn = tx->tx_conn;
887         LASSERT (conn != NULL);
888         LASSERT (tx->tx_sending != 0);
889
890         spin_lock_irqsave(&conn->ibc_lock, flags);
891
892         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
893                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
894                e->status);
895
896         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
897          * gets to free it, which also drops its ref on 'conn'.  If it's
898          * not me, then I take an extra ref on conn so it can't disappear
899          * under me. */
900
901         tx->tx_sending--;
902         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
903                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
904
905         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
906                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
907                atomic_read (&conn->ibc_refcount));
908         atomic_inc (&conn->ibc_refcount);
909
910         if (tx->tx_sending == 0)
911                 conn->ibc_nsends_posted--;
912
913         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
914             tx->tx_status == 0)
915                 tx->tx_status = -ECONNABORTED;
916                 
917         spin_unlock_irqrestore(&conn->ibc_lock, flags);
918
919         if (idle)
920                 koibnal_tx_done (tx);
921
922         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
923                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
924                         conn->ibc_peer->ibp_nid, e->status);
925                 koibnal_close_conn (conn, -ENETDOWN);
926         } else {
927                 /* can I shovel some more sends out the door? */
928                 koibnal_check_sends(conn);
929         }
930
931         koibnal_put_conn (conn);
932 }
933
934 void
935 koibnal_init_tx_msg (koib_tx_t *tx, int type, int body_nob)
936 {
937         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
938         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
939         int                       fence;
940         int                       nob = offsetof (koib_msg_t, oibm_u) + body_nob;
941
942         LASSERT (tx->tx_nsp >= 0 && 
943                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
944         LASSERT (nob <= OPENIBNAL_MSG_SIZE);
945         
946         tx->tx_msg->oibm_magic = OPENIBNAL_MSG_MAGIC;
947         tx->tx_msg->oibm_version = OPENIBNAL_MSG_VERSION;
948         tx->tx_msg->oibm_type = type;
949 #if OPENIBNAL_CKSUM
950         tx->tx_msg->oibm_nob = nob;
951 #endif
952         /* Fence the message if it's bundled with an RDMA read */
953         fence = (tx->tx_nsp > 0) &&
954                 (type == OPENIBNAL_MSG_PUT_DONE);
955
956         *gl = (struct ib_gather_scatter) {
957                 .address = tx->tx_vaddr,
958                 .length  = nob,
959                 .key     = koibnal_data.koib_tx_pages->oibp_lkey,
960         };
961
962         /* NB If this is an RDMA read, the completion message must wait for
963          * the RDMA to complete.  Sends wait for previous RDMA writes
964          * anyway... */
965         *sp = (struct ib_send_param) {
966                 .work_request_id      = (__u64)((unsigned long)tx),
967                 .op                   = IB_OP_SEND,
968                 .gather_list          = gl,
969                 .num_gather_entries   = 1,
970                 .device_specific      = NULL,
971                 .solicited_event      = 1,
972                 .signaled             = 1,
973                 .immediate_data_valid = 0,
974                 .fence                = fence,
975                 .inline_data          = 0,
976         };
977
978         tx->tx_nsp++;
979 }
980
981 void
982 koibnal_queue_tx (koib_tx_t *tx, koib_conn_t *conn)
983 {
984         unsigned long         flags;
985
986         spin_lock_irqsave(&conn->ibc_lock, flags);
987
988         koibnal_queue_tx_locked (tx, conn);
989         
990         spin_unlock_irqrestore(&conn->ibc_lock, flags);
991         
992         koibnal_check_sends(conn);
993 }
994
995 void
996 koibnal_launch_tx (koib_tx_t *tx, ptl_nid_t nid)
997 {
998         unsigned long    flags;
999         koib_peer_t     *peer;
1000         koib_conn_t     *conn;
1001         rwlock_t        *g_lock = &koibnal_data.koib_global_lock;
1002
1003         /* If I get here, I've committed to send, so I complete the tx with
1004          * failure on any problems */
1005         
1006         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1007         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
1008
1009         read_lock (g_lock);
1010         
1011         peer = koibnal_find_peer_locked (nid);
1012         if (peer == NULL) {
1013                 read_unlock (g_lock);
1014                 tx->tx_status = -EHOSTUNREACH;
1015                 koibnal_tx_done (tx);
1016                 return;
1017         }
1018
1019         conn = koibnal_find_conn_locked (peer);
1020         if (conn != NULL) {
1021                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1022                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1023                        atomic_read (&conn->ibc_refcount));
1024                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1025                 read_unlock (g_lock);
1026                 
1027                 koibnal_queue_tx (tx, conn);
1028                 return;
1029         }
1030         
1031         /* Making one or more connections; I'll need a write lock... */
1032         read_unlock (g_lock);
1033         write_lock_irqsave (g_lock, flags);
1034
1035         peer = koibnal_find_peer_locked (nid);
1036         if (peer == NULL) {
1037                 write_unlock_irqrestore (g_lock, flags);
1038                 tx->tx_status = -EHOSTUNREACH;
1039                 koibnal_tx_done (tx);
1040                 return;
1041         }
1042
1043         conn = koibnal_find_conn_locked (peer);
1044         if (conn != NULL) {
1045                 /* Connection exists; queue message on it */
1046                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1047                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1048                        atomic_read (&conn->ibc_refcount));
1049                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1050                 write_unlock_irqrestore (g_lock, flags);
1051                 
1052                 koibnal_queue_tx (tx, conn);
1053                 return;
1054         }
1055
1056         if (peer->ibp_connecting == 0) {
1057                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1058                         write_unlock_irqrestore (g_lock, flags);
1059                         tx->tx_status = -EHOSTUNREACH;
1060                         koibnal_tx_done (tx);
1061                         return;
1062                 }
1063         
1064                 peer->ibp_connecting = 1;
1065                 atomic_inc (&peer->ibp_refcount); /* extra ref for connd */
1066         
1067                 spin_lock (&koibnal_data.koib_connd_lock);
1068         
1069                 list_add_tail (&peer->ibp_connd_list,
1070                                &koibnal_data.koib_connd_peers);
1071                 wake_up (&koibnal_data.koib_connd_waitq);
1072         
1073                 spin_unlock (&koibnal_data.koib_connd_lock);
1074         }
1075         
1076         /* A connection is being established; queue the message... */
1077         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1078
1079         write_unlock_irqrestore (g_lock, flags);
1080 }
1081
1082 ptl_err_t
1083 koibnal_start_passive_rdma (int type, ptl_nid_t nid,
1084                             lib_msg_t *libmsg, ptl_hdr_t *hdr)
1085 {
1086         int         nob = libmsg->md->length;
1087         koib_tx_t  *tx;
1088         koib_msg_t *oibmsg;
1089         int         rc;
1090         int         access;
1091         
1092         LASSERT (type == OPENIBNAL_MSG_PUT_RDMA || 
1093                  type == OPENIBNAL_MSG_GET_RDMA);
1094         LASSERT (nob > 0);
1095         LASSERT (!in_interrupt());              /* Mapping could block */
1096
1097         if (type == OPENIBNAL_MSG_PUT_RDMA) {
1098                 access = IB_ACCESS_REMOTE_READ;
1099         } else {
1100                 access = IB_ACCESS_REMOTE_WRITE |
1101                          IB_ACCESS_LOCAL_WRITE;
1102         }
1103
1104         tx = koibnal_get_idle_tx (1);           /* May block; caller is an app thread */
1105         LASSERT (tx != NULL);
1106
1107         if ((libmsg->md->options & PTL_MD_KIOV) == 0) 
1108                 rc = koibnal_map_iov (tx, access,
1109                                       libmsg->md->md_niov,
1110                                       libmsg->md->md_iov.iov,
1111                                       0, nob);
1112         else
1113                 rc = koibnal_map_kiov (tx, access,
1114                                        libmsg->md->md_niov, 
1115                                        libmsg->md->md_iov.kiov,
1116                                        0, nob);
1117
1118         if (rc != 0) {
1119                 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1120                 goto failed;
1121         }
1122         
1123         if (type == OPENIBNAL_MSG_GET_RDMA) {
1124                 /* reply gets finalized when tx completes */
1125                 tx->tx_libmsg[1] = lib_create_reply_msg(&koibnal_lib, 
1126                                                         nid, libmsg);
1127                 if (tx->tx_libmsg[1] == NULL) {
1128                         CERROR ("Can't create reply for GET -> "LPX64"\n",
1129                                 nid);
1130                         rc = -ENOMEM;
1131                         goto failed;
1132                 }
1133         }
1134         
1135         tx->tx_passive_rdma = 1;
1136
1137         oibmsg = tx->tx_msg;
1138
1139         oibmsg->oibm_u.rdma.oibrm_hdr = *hdr;
1140         oibmsg->oibm_u.rdma.oibrm_cookie = tx->tx_passive_rdma_cookie;
1141         oibmsg->oibm_u.rdma.oibrm_desc.rd_key = tx->tx_md.md_rkey;
1142         oibmsg->oibm_u.rdma.oibrm_desc.rd_addr = tx->tx_md.md_addr;
1143         oibmsg->oibm_u.rdma.oibrm_desc.rd_nob = nob;
1144
1145         koibnal_init_tx_msg (tx, type, sizeof (koib_rdma_msg_t));
1146
1147         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1148                LPX64", nob %d\n",
1149                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1150                tx->tx_md.md_addr, nob);
1151         
1152         /* libmsg gets finalized when tx completes. */
1153         tx->tx_libmsg[0] = libmsg;
1154
1155         koibnal_launch_tx(tx, nid);
1156         return (PTL_OK);
1157
1158  failed:
1159         tx->tx_status = rc;
1160         koibnal_tx_done (tx);
1161         return (PTL_FAIL);
1162 }
1163
1164 void
1165 koibnal_start_active_rdma (int type, int status,
1166                            koib_rx_t *rx, lib_msg_t *libmsg, 
1167                            unsigned int niov,
1168                            struct iovec *iov, ptl_kiov_t *kiov,
1169                            size_t offset, size_t nob)
1170 {
1171         koib_msg_t   *rxmsg = rx->rx_msg;
1172         koib_msg_t   *txmsg;
1173         koib_tx_t    *tx;
1174         int           access;
1175         int           rdma_op;
1176         int           rc;
1177
1178         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1179                type, status, niov, offset, nob);
1180
1181         /* Called by scheduler */
1182         LASSERT (!in_interrupt ());
1183
1184         /* Either all pages or all vaddrs */
1185         LASSERT (!(kiov != NULL && iov != NULL));
1186
1187         /* No data if we're completing with failure */
1188         LASSERT (status == 0 || nob == 0);
1189
1190         LASSERT (type == OPENIBNAL_MSG_GET_DONE ||
1191                  type == OPENIBNAL_MSG_PUT_DONE);
1192
1193         /* Flag I'm completing the RDMA.  Even if I fail to send the
1194          * completion message, I will have tried my best so further
1195          * attempts shouldn't be tried. */
1196         LASSERT (!rx->rx_rdma);
1197         rx->rx_rdma = 1;
1198
1199         if (type == OPENIBNAL_MSG_GET_DONE) {
1200                 access   = 0;
1201                 rdma_op  = IB_OP_RDMA_WRITE;
1202                 LASSERT (rxmsg->oibm_type == OPENIBNAL_MSG_GET_RDMA);
1203         } else {
1204                 access   = IB_ACCESS_LOCAL_WRITE;
1205                 rdma_op  = IB_OP_RDMA_READ;
1206                 LASSERT (rxmsg->oibm_type == OPENIBNAL_MSG_PUT_RDMA);
1207         }
1208
1209         tx = koibnal_get_idle_tx (0);           /* Mustn't block */
1210         if (tx == NULL) {
1211                 CERROR ("tx descs exhausted on RDMA from "LPX64
1212                         " completing locally with failure\n",
1213                          rx->rx_conn->ibc_peer->ibp_nid);
1214                 lib_finalize (&koibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1215                 return;
1216         }
1217         LASSERT (tx->tx_nsp == 0);
1218                         
1219         if (nob != 0) {
1220                 /* We actually need to transfer some data (the transfer
1221                  * size could get truncated to zero when the incoming
1222                  * message is matched) */
1223
1224                 if (kiov != NULL)
1225                         rc = koibnal_map_kiov (tx, access,
1226                                                niov, kiov, offset, nob);
1227                 else
1228                         rc = koibnal_map_iov (tx, access,
1229                                               niov, iov, offset, nob);
1230                 
1231                 if (rc != 0) {
1232                         CERROR ("Can't map RDMA -> "LPX64": %d\n", 
1233                                 rx->rx_conn->ibc_peer->ibp_nid, rc);
1234                         /* We'll skip the RDMA and complete with failure. */
1235                         status = rc;
1236                         nob = 0;
1237                 } else {
1238                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1239                                 .address = tx->tx_md.md_addr,
1240                                 .length  = nob,
1241                                 .key     = tx->tx_md.md_lkey,
1242                         };
1243                 
1244                         tx->tx_sp[0] = (struct ib_send_param) {
1245                                 .work_request_id      = (__u64)((unsigned long)tx),
1246                                 .op                   = rdma_op,
1247                                 .gather_list          = &tx->tx_gl[0],
1248                                 .num_gather_entries   = 1,
1249                                 .remote_address       = rxmsg->oibm_u.rdma.oibrm_desc.rd_addr,
1250                                 .rkey                 = rxmsg->oibm_u.rdma.oibrm_desc.rd_key,
1251                                 .device_specific      = NULL,
1252                                 .solicited_event      = 0,
1253                                 .signaled             = 1,
1254                                 .immediate_data_valid = 0,
1255                                 .fence                = 0,
1256                                 .inline_data          = 0,
1257                         };
1258
1259                         tx->tx_nsp = 1;
1260                 }
1261         }
1262
1263         txmsg = tx->tx_msg;
1264
1265         txmsg->oibm_u.completion.oibcm_cookie = rxmsg->oibm_u.rdma.oibrm_cookie;
1266         txmsg->oibm_u.completion.oibcm_status = status;
1267         
1268         koibnal_init_tx_msg(tx, type, sizeof (koib_completion_msg_t));
1269
1270         if (status == 0 && nob != 0) {
1271                 LASSERT (tx->tx_nsp > 1);
1272                 /* RDMA: libmsg gets finalized when the tx completes.  This
1273                  * is after the completion message has been sent, which in
1274                  * turn is after the RDMA has finished. */
1275                 tx->tx_libmsg[0] = libmsg;
1276         } else {
1277                 LASSERT (tx->tx_nsp == 1);
1278                 /* No RDMA: local completion happens now! */
1279                 CDEBUG(D_WARNING,"No data: immediate completion\n");
1280                 lib_finalize (&koibnal_lib, NULL, libmsg,
1281                               status == 0 ? PTL_OK : PTL_FAIL);
1282         }
1283
1284         /* +1 ref for this tx... */
1285         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1286                rx->rx_conn, rx->rx_conn->ibc_state, 
1287                rx->rx_conn->ibc_peer->ibp_nid,
1288                atomic_read (&rx->rx_conn->ibc_refcount));
1289         atomic_inc (&rx->rx_conn->ibc_refcount);
1290         /* ...and queue it up */
1291         koibnal_queue_tx(tx, rx->rx_conn);
1292 }
1293
1294 ptl_err_t
1295 koibnal_sendmsg(lib_nal_t    *nal, 
1296                 void         *private,
1297                 lib_msg_t    *libmsg,
1298                 ptl_hdr_t    *hdr, 
1299                 int           type, 
1300                 ptl_nid_t     nid, 
1301                 ptl_pid_t     pid,
1302                 unsigned int  payload_niov, 
1303                 struct iovec *payload_iov, 
1304                 ptl_kiov_t   *payload_kiov,
1305                 size_t        payload_offset,
1306                 size_t        payload_nob)
1307 {
1308         koib_msg_t *oibmsg;
1309         koib_tx_t  *tx;
1310         int         nob;
1311
1312         /* NB 'private' is different depending on what we're sending.... */
1313
1314         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1315                " pid %d\n", payload_nob, payload_niov, nid , pid);
1316
1317         LASSERT (payload_nob == 0 || payload_niov > 0);
1318         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1319
1320         /* Thread context if we're sending payload */
1321         LASSERT (!in_interrupt() || payload_niov == 0);
1322         /* payload is either all vaddrs or all pages */
1323         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1324
1325         switch (type) {
1326         default:
1327                 LBUG();
1328                 return (PTL_FAIL);
1329                 
1330         case PTL_MSG_REPLY: {
1331                 /* reply's 'private' is the incoming receive */
1332                 koib_rx_t *rx = private;
1333
1334                 /* RDMA reply expected? */
1335                 if (rx->rx_msg->oibm_type == OPENIBNAL_MSG_GET_RDMA) {
1336                         koibnal_start_active_rdma(OPENIBNAL_MSG_GET_DONE, 0,
1337                                                   rx, libmsg, payload_niov, 
1338                                                   payload_iov, payload_kiov,
1339                                                   payload_offset, payload_nob);
1340                         return (PTL_OK);
1341                 }
1342                 
1343                 /* Incoming message consistent with immediate reply? */
1344                 if (rx->rx_msg->oibm_type != OPENIBNAL_MSG_IMMEDIATE) {
1345                         CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1346                                 nid, rx->rx_msg->oibm_type);
1347                         return (PTL_FAIL);
1348                 }
1349
1350                 /* Will it fit in a message? */
1351                 nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[payload_nob]);
1352                 if (nob >= OPENIBNAL_MSG_SIZE) {
1353                         CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n", 
1354                                nid, payload_nob);
1355                         return (PTL_FAIL);
1356                 }
1357                 break;
1358         }
1359
1360         case PTL_MSG_GET:
1361                 /* might the REPLY message be big enough to need RDMA? */
1362                 nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[libmsg->md->length]);
1363                 if (nob > OPENIBNAL_MSG_SIZE)
1364                         return (koibnal_start_passive_rdma(OPENIBNAL_MSG_GET_RDMA, 
1365                                                            nid, libmsg, hdr));
1366                 break;
1367
1368         case PTL_MSG_ACK:
1369                 LASSERT (payload_nob == 0);
1370                 break;
1371
1372         case PTL_MSG_PUT:
1373                 /* Is the payload big enough to need RDMA? */
1374                 nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[payload_nob]);
1375                 if (nob > OPENIBNAL_MSG_SIZE)
1376                         return (koibnal_start_passive_rdma(OPENIBNAL_MSG_PUT_RDMA,
1377                                                            nid, libmsg, hdr));
1378                 
1379                 break;
1380         }
1381
1382         tx = koibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1383                                    type == PTL_MSG_REPLY ||
1384                                    in_interrupt()));
1385         if (tx == NULL) {
1386                 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", 
1387                         type, nid, in_interrupt() ? " (intr)" : "");
1388                 return (PTL_NO_SPACE);
1389         }
1390
1391         oibmsg = tx->tx_msg;
1392         oibmsg->oibm_u.immediate.oibim_hdr = *hdr;
1393
1394         if (payload_nob > 0) {
1395                 if (payload_kiov != NULL)
1396                         lib_copy_kiov2buf(oibmsg->oibm_u.immediate.oibim_payload,
1397                                           payload_niov, payload_kiov,
1398                                           payload_offset, payload_nob);
1399                 else
1400                         lib_copy_iov2buf(oibmsg->oibm_u.immediate.oibim_payload,
1401                                          payload_niov, payload_iov,
1402                                          payload_offset, payload_nob);
1403         }
1404
1405         koibnal_init_tx_msg (tx, OPENIBNAL_MSG_IMMEDIATE,
1406                              offsetof(koib_immediate_msg_t, 
1407                                       oibim_payload[payload_nob]));
1408
1409         /* libmsg gets finalized when tx completes */
1410         tx->tx_libmsg[0] = libmsg;
1411
1412         koibnal_launch_tx(tx, nid);
1413         return (PTL_OK);
1414 }
1415
1416 ptl_err_t
1417 koibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1418                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1419                unsigned int payload_niov, struct iovec *payload_iov,
1420                size_t payload_offset, size_t payload_len)
1421 {
1422         return (koibnal_sendmsg(nal, private, cookie,
1423                                  hdr, type, nid, pid,
1424                                  payload_niov, payload_iov, NULL,
1425                                  payload_offset, payload_len));
1426 }
1427
1428 ptl_err_t
1429 koibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1430                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1431                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1432                      size_t payload_offset, size_t payload_len)
1433 {
1434         return (koibnal_sendmsg(nal, private, cookie,
1435                                  hdr, type, nid, pid,
1436                                  payload_niov, NULL, payload_kiov,
1437                                  payload_offset, payload_len));
1438 }
1439
1440 ptl_err_t
1441 koibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1442                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1443                  size_t offset, size_t mlen, size_t rlen)
1444 {
1445         koib_rx_t                *rx = private;
1446         koib_msg_t               *rxmsg = rx->rx_msg;
1447         int                       msg_nob;
1448         
1449         LASSERT (mlen <= rlen);
1450         LASSERT (!in_interrupt ());
1451         /* Either all pages or all vaddrs */
1452         LASSERT (!(kiov != NULL && iov != NULL));
1453
1454         switch (rxmsg->oibm_type) {
1455         default:
1456                 LBUG();
1457                 return (PTL_FAIL);
1458                 
1459         case OPENIBNAL_MSG_IMMEDIATE:
1460                 msg_nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[rlen]);
1461                 if (msg_nob > OPENIBNAL_MSG_SIZE) {
1462                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1463                                 rxmsg->oibm_u.immediate.oibim_hdr.src_nid, rlen);
1464                         return (PTL_FAIL);
1465                 }
1466
1467                 if (kiov != NULL)
1468                         lib_copy_buf2kiov(niov, kiov, offset,
1469                                           rxmsg->oibm_u.immediate.oibim_payload,
1470                                           mlen);
1471                 else
1472                         lib_copy_buf2iov(niov, iov, offset,
1473                                          rxmsg->oibm_u.immediate.oibim_payload,
1474                                          mlen);
1475
1476                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1477                 return (PTL_OK);
1478
1479         case OPENIBNAL_MSG_GET_RDMA:
1480                 /* We get called here just to discard any junk after the
1481                  * GET hdr. */
1482                 LASSERT (libmsg == NULL);
1483                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1484                 return (PTL_OK);
1485
1486         case OPENIBNAL_MSG_PUT_RDMA:
1487                 koibnal_start_active_rdma (OPENIBNAL_MSG_PUT_DONE, 0,
1488                                            rx, libmsg, 
1489                                            niov, iov, kiov, offset, mlen);
1490                 return (PTL_OK);
1491         }
1492 }
1493
1494 ptl_err_t
1495 koibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1496               unsigned int niov, struct iovec *iov, 
1497               size_t offset, size_t mlen, size_t rlen)
1498 {
1499         return (koibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1500                                  offset, mlen, rlen));
1501 }
1502
1503 ptl_err_t
1504 koibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1505                      unsigned int niov, ptl_kiov_t *kiov, 
1506                      size_t offset, size_t mlen, size_t rlen)
1507 {
1508         return (koibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1509                                  offset, mlen, rlen));
1510 }
1511
1512 int
1513 koibnal_thread_start (int (*fn)(void *arg), void *arg)
1514 {
1515         long    pid = kernel_thread (fn, arg, 0);
1516
1517         if (pid < 0)
1518                 return ((int)pid);
1519
1520         atomic_inc (&koibnal_data.koib_nthreads);
1521         return (0);
1522 }
1523
1524 void
1525 koibnal_thread_fini (void)
1526 {
1527         atomic_dec (&koibnal_data.koib_nthreads);
1528 }
1529
1530 void
1531 koibnal_close_conn_locked (koib_conn_t *conn, int error)
1532 {
1533         /* This just does the immmediate housekeeping, and schedules the
1534          * connection for the connd to finish off.
1535          * Caller holds koib_global_lock exclusively in irq context */
1536         koib_peer_t   *peer = conn->ibc_peer;
1537
1538         CDEBUG (error == 0 ? D_NET : D_ERROR,
1539                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1540         
1541         LASSERT (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED ||
1542                  conn->ibc_state == OPENIBNAL_CONN_CONNECTING);
1543
1544         if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED) {
1545                 /* koib_connd_conns takes ibc_list's ref */
1546                 list_del (&conn->ibc_list);
1547         } else {
1548                 /* new ref for koib_connd_conns */
1549                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1550                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1551                        atomic_read (&conn->ibc_refcount));
1552                 atomic_inc (&conn->ibc_refcount);
1553         }
1554         
1555         if (list_empty (&peer->ibp_conns) &&
1556             peer->ibp_persistence == 0) {
1557                 /* Non-persistent peer with no more conns... */
1558                 koibnal_unlink_peer_locked (peer);
1559         }
1560
1561         conn->ibc_state = OPENIBNAL_CONN_DEATHROW;
1562
1563         /* Schedule conn for closing/destruction */
1564         spin_lock (&koibnal_data.koib_connd_lock);
1565
1566         list_add_tail (&conn->ibc_list, &koibnal_data.koib_connd_conns);
1567         wake_up (&koibnal_data.koib_connd_waitq);
1568                 
1569         spin_unlock (&koibnal_data.koib_connd_lock);
1570 }
1571
1572 int
1573 koibnal_close_conn (koib_conn_t *conn, int why)
1574 {
1575         unsigned long     flags;
1576         int               count = 0;
1577
1578         write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1579
1580         LASSERT (conn->ibc_state >= OPENIBNAL_CONN_CONNECTING);
1581         
1582         if (conn->ibc_state <= OPENIBNAL_CONN_ESTABLISHED) {
1583                 count = 1;
1584                 koibnal_close_conn_locked (conn, why);
1585         }
1586         
1587         write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1588         return (count);
1589 }
1590
1591 void
1592 koibnal_peer_connect_failed (koib_peer_t *peer, int active, int rc)
1593 {
1594         LIST_HEAD        (zombies);
1595         koib_tx_t        *tx;
1596         unsigned long     flags;
1597
1598         LASSERT (rc != 0);
1599         LASSERT (peer->ibp_reconnect_interval >= OPENIBNAL_MIN_RECONNECT_INTERVAL);
1600
1601         write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1602
1603         LASSERT (peer->ibp_connecting != 0);
1604         peer->ibp_connecting--;
1605
1606         if (peer->ibp_connecting != 0) {
1607                 /* another connection attempt under way (loopback?)... */
1608                 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1609                 return;
1610         }
1611
1612         if (list_empty(&peer->ibp_conns)) {
1613                 /* Say when active connection can be re-attempted */
1614                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1615                 /* Increase reconnection interval */
1616                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1617                                                     OPENIBNAL_MAX_RECONNECT_INTERVAL);
1618         
1619                 /* Take peer's blocked blocked transmits; I'll complete
1620                  * them with error */
1621                 while (!list_empty (&peer->ibp_tx_queue)) {
1622                         tx = list_entry (peer->ibp_tx_queue.next,
1623                                          koib_tx_t, tx_list);
1624                         
1625                         list_del (&tx->tx_list);
1626                         list_add_tail (&tx->tx_list, &zombies);
1627                 }
1628                 
1629                 if (koibnal_peer_active(peer) &&
1630                     (peer->ibp_persistence == 0)) {
1631                         /* failed connection attempt on non-persistent peer */
1632                         koibnal_unlink_peer_locked (peer);
1633                 }
1634         } else {
1635                 /* Can't have blocked transmits if there are connections */
1636                 LASSERT (list_empty(&peer->ibp_tx_queue));
1637         }
1638         
1639         write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1640
1641         if (!list_empty (&zombies))
1642                 CERROR ("Deleting messages for "LPX64": connection failed\n",
1643                         peer->ibp_nid);
1644
1645         while (!list_empty (&zombies)) {
1646                 tx = list_entry (zombies.next, koib_tx_t, tx_list);
1647
1648                 list_del (&tx->tx_list);
1649                 /* complete now */
1650                 tx->tx_status = -EHOSTUNREACH;
1651                 koibnal_tx_done (tx);
1652         }
1653 }
1654
1655 void
1656 koibnal_connreq_done (koib_conn_t *conn, int active, int status)
1657 {
1658         int               state = conn->ibc_state;
1659         koib_peer_t      *peer = conn->ibc_peer;
1660         koib_tx_t        *tx;
1661         unsigned long     flags;
1662         int               rc;
1663         int               i;
1664
1665         /* passive connection has no connreq & vice versa */
1666         LASSERT (!active == !(conn->ibc_connreq != NULL));
1667         if (active) {
1668                 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1669                 conn->ibc_connreq = NULL;
1670         }
1671
1672         if (state == OPENIBNAL_CONN_CONNECTING) {
1673                 /* Install common (active/passive) callback for
1674                  * disconnect/idle notification if I got as far as getting
1675                  * a CM comm_id */
1676                 rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1677                                           koibnal_conn_callback, conn);
1678                 LASSERT (rc == 0);
1679         }
1680         
1681         write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1682
1683         LASSERT (peer->ibp_connecting != 0);
1684         
1685         if (status == 0) {                         
1686                 /* connection established... */
1687                 LASSERT (state == OPENIBNAL_CONN_CONNECTING);
1688                 conn->ibc_state = OPENIBNAL_CONN_ESTABLISHED;
1689
1690                 if (!koibnal_peer_active(peer)) {
1691                         /* ...but peer deleted meantime */
1692                         status = -ECONNABORTED;
1693                 }
1694         } else {
1695                 LASSERT (state == OPENIBNAL_CONN_INIT_QP ||
1696                          state == OPENIBNAL_CONN_CONNECTING);
1697         }
1698
1699         if (status == 0) {
1700                 /* Everything worked! */
1701
1702                 peer->ibp_connecting--;
1703
1704                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1705                  * the IB_CM_IDLE callback */
1706                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1707                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1708                        atomic_read (&conn->ibc_refcount));
1709                 atomic_inc (&conn->ibc_refcount);
1710                 list_add (&conn->ibc_list, &peer->ibp_conns);
1711                 
1712                 /* reset reconnect interval for next attempt */
1713                 peer->ibp_reconnect_interval = OPENIBNAL_MIN_RECONNECT_INTERVAL;
1714
1715                 /* post blocked sends to the new connection */
1716                 spin_lock (&conn->ibc_lock);
1717                 
1718                 while (!list_empty (&peer->ibp_tx_queue)) {
1719                         tx = list_entry (peer->ibp_tx_queue.next, 
1720                                          koib_tx_t, tx_list);
1721                         
1722                         list_del (&tx->tx_list);
1723
1724                         /* +1 ref for each tx */
1725                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1726                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1727                                atomic_read (&conn->ibc_refcount));
1728                         atomic_inc (&conn->ibc_refcount);
1729                         koibnal_queue_tx_locked (tx, conn);
1730                 }
1731                 
1732                 spin_unlock (&conn->ibc_lock);
1733
1734                 /* Nuke any dangling conns from a different peer instance... */
1735                 koibnal_close_stale_conns_locked (conn->ibc_peer,
1736                                                   conn->ibc_incarnation);
1737
1738                 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1739
1740                 /* queue up all the receives */
1741                 for (i = 0; i < OPENIBNAL_RX_MSGS; i++) {
1742                         /* +1 ref for rx desc */
1743                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1744                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1745                                atomic_read (&conn->ibc_refcount));
1746                         atomic_inc (&conn->ibc_refcount);
1747
1748                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1749                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1750                                conn->ibc_rxs[i].rx_vaddr);
1751
1752                         koibnal_post_rx (&conn->ibc_rxs[i], 0);
1753                 }
1754
1755                 koibnal_check_sends (conn);
1756                 return;
1757         }
1758
1759         /* connection failed */
1760         if (state == OPENIBNAL_CONN_CONNECTING) {
1761                 /* schedule for connd to close */
1762                 koibnal_close_conn_locked (conn, status);
1763         } else {
1764                 /* Don't have a CM comm_id; just wait for refs to drain */
1765                 conn->ibc_state = OPENIBNAL_CONN_ZOMBIE;
1766         } 
1767
1768         write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1769
1770         koibnal_peer_connect_failed (conn->ibc_peer, active, status);
1771
1772         if (state != OPENIBNAL_CONN_CONNECTING) {
1773                 /* drop caller's ref if we're not waiting for the
1774                  * IB_CM_IDLE callback */
1775                 koibnal_put_conn (conn);
1776         }
1777 }
1778
1779 int
1780 koibnal_accept (koib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1781                 ptl_nid_t nid, __u64 incarnation, int queue_depth)
1782 {
1783         koib_conn_t   *conn = koibnal_create_conn();
1784         koib_peer_t   *peer;
1785         koib_peer_t   *peer2;
1786         unsigned long  flags;
1787
1788         if (conn == NULL)
1789                 return (-ENOMEM);
1790
1791         if (queue_depth != OPENIBNAL_MSG_QUEUE_SIZE) {
1792                 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
1793                        nid, queue_depth, OPENIBNAL_MSG_QUEUE_SIZE);
1794                 return (-EPROTO);
1795         }
1796         
1797         /* assume 'nid' is a new peer */
1798         peer = koibnal_create_peer (nid);
1799         if (peer == NULL) {
1800                 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
1801                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1802                        atomic_read (&conn->ibc_refcount));
1803                 atomic_dec (&conn->ibc_refcount);
1804                 koibnal_destroy_conn(conn);
1805                 return (-ENOMEM);
1806         }
1807         
1808         write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1809
1810         peer2 = koibnal_find_peer_locked(nid);
1811         if (peer2 == NULL) {
1812                 /* peer table takes my ref on peer */
1813                 list_add_tail (&peer->ibp_list,
1814                                koibnal_nid2peerlist(nid));
1815         } else {
1816                 koibnal_put_peer (peer);
1817                 peer = peer2;
1818         }
1819
1820         /* +1 ref for conn */
1821         atomic_inc (&peer->ibp_refcount);
1822         peer->ibp_connecting++;
1823
1824         write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1825
1826         conn->ibc_peer = peer;
1827         conn->ibc_state = OPENIBNAL_CONN_CONNECTING;
1828         conn->ibc_comm_id = cid;
1829         conn->ibc_incarnation = incarnation;
1830         conn->ibc_credits = OPENIBNAL_MSG_QUEUE_SIZE;
1831
1832         *connp = conn;
1833         return (0);
1834 }
1835
1836 tTS_IB_CM_CALLBACK_RETURN
1837 koibnal_idle_conn_callback (tTS_IB_CM_EVENT event,
1838                             tTS_IB_CM_COMM_ID cid,
1839                             void *param,
1840                             void *arg)
1841 {
1842         /* Shouldn't ever get a callback after TS_IB_CM_IDLE */
1843         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1844         LBUG ();
1845         return TS_IB_CM_CALLBACK_PROCEED;
1846 }
1847
1848 tTS_IB_CM_CALLBACK_RETURN
1849 koibnal_conn_callback (tTS_IB_CM_EVENT event,
1850                        tTS_IB_CM_COMM_ID cid,
1851                        void *param,
1852                        void *arg)
1853 {
1854         koib_conn_t *conn = arg;
1855         int          rc;
1856
1857         /* Established Connection Notifier */
1858
1859         switch (event) {
1860         default:
1861                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
1862                        conn, conn->ibc_peer->ibp_nid, event);
1863                 koibnal_close_conn (conn, -ECONNABORTED);
1864                 break;
1865                 
1866         case TS_IB_CM_DISCONNECTED:
1867                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" DISCONNECTED.\n",
1868                        conn, conn->ibc_peer->ibp_nid);
1869                 koibnal_close_conn (conn, 0);
1870                 break;
1871
1872         case TS_IB_CM_IDLE:
1873                 CDEBUG(D_NET, "Connection %p -> "LPX64" IDLE.\n",
1874                        conn, conn->ibc_peer->ibp_nid);
1875                 koibnal_put_conn (conn);        /* Lose CM's ref */
1876
1877                 /* LASSERT (no further callbacks) */
1878                 rc = tsIbCmCallbackModify(cid, 
1879                                           koibnal_idle_conn_callback, conn);
1880                 LASSERT (rc == 0);
1881                 break;
1882         }
1883
1884         return TS_IB_CM_CALLBACK_PROCEED;
1885 }
1886
1887 tTS_IB_CM_CALLBACK_RETURN
1888 koibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1889                                tTS_IB_CM_COMM_ID cid,
1890                                void *param,
1891                                void *arg)
1892 {
1893         koib_conn_t *conn = arg;
1894         int          rc;
1895         
1896         switch (event) {
1897         default:
1898                 if (conn == NULL) {
1899                         /* no connection yet */
1900                         CERROR ("Unexpected event: %d\n", event);
1901                         return TS_IB_CM_CALLBACK_ABORT;
1902                 }
1903                 
1904                 CERROR ("Unexpected event %p -> "LPX64": %d\n", 
1905                         conn, conn->ibc_peer->ibp_nid, event);
1906                 koibnal_connreq_done (conn, 0, -ECONNABORTED);
1907                 break;
1908                 
1909         case TS_IB_CM_REQ_RECEIVED: {
1910                 struct ib_cm_req_received_param *req = param;
1911                 koib_wire_connreq_t             *wcr = req->remote_private_data;
1912
1913                 LASSERT (conn == NULL);
1914
1915                 CDEBUG(D_NET, "REQ from "LPX64"\n", le64_to_cpu(wcr->wcr_nid));
1916
1917                 if (req->remote_private_data_len < sizeof (*wcr)) {
1918                         CERROR("Connect from remote LID %04x: too short %d\n",
1919                                req->dlid, req->remote_private_data_len);
1920                         return TS_IB_CM_CALLBACK_ABORT;
1921                 }
1922
1923                 if (wcr->wcr_magic != cpu_to_le32(OPENIBNAL_MSG_MAGIC)) {
1924                         CERROR ("Can't accept LID %04x: bad magic %08x\n",
1925                                 req->dlid, le32_to_cpu(wcr->wcr_magic));
1926                         return TS_IB_CM_CALLBACK_ABORT;
1927                 }
1928                 
1929                 if (wcr->wcr_version != cpu_to_le16(OPENIBNAL_MSG_VERSION)) {
1930                         CERROR ("Can't accept LID %04x: bad version %d\n",
1931                                 req->dlid, le16_to_cpu(wcr->wcr_magic));
1932                         return TS_IB_CM_CALLBACK_ABORT;
1933                 }
1934                                 
1935                 rc = koibnal_accept(&conn,
1936                                     cid,
1937                                     le64_to_cpu(wcr->wcr_nid),
1938                                     le64_to_cpu(wcr->wcr_incarnation),
1939                                     le16_to_cpu(wcr->wcr_queue_depth));
1940                 if (rc != 0) {
1941                         CERROR ("Can't accept "LPX64": %d\n",
1942                                 le64_to_cpu(wcr->wcr_nid), rc);
1943                         return TS_IB_CM_CALLBACK_ABORT;
1944                 }
1945
1946                 /* update 'arg' for next callback */
1947                 rc = tsIbCmCallbackModify(cid, 
1948                                           koibnal_passive_conn_callback, conn);
1949                 LASSERT (rc == 0);
1950
1951                 req->accept_param.qp                     = conn->ibc_qp;
1952                 *((koib_wire_connreq_t *)req->accept_param.reply_private_data)
1953                         = (koib_wire_connreq_t) {
1954                                 .wcr_magic       = cpu_to_le32(OPENIBNAL_MSG_MAGIC),
1955                                 .wcr_version     = cpu_to_le16(OPENIBNAL_MSG_VERSION),
1956                                 .wcr_queue_depth = cpu_to_le32(OPENIBNAL_MSG_QUEUE_SIZE),
1957                                 .wcr_nid         = cpu_to_le64(koibnal_data.koib_nid),
1958                                 .wcr_incarnation = cpu_to_le64(koibnal_data.koib_incarnation),
1959                         };
1960                 req->accept_param.reply_private_data_len = sizeof(koib_wire_connreq_t);
1961                 req->accept_param.responder_resources    = OPENIBNAL_RESPONDER_RESOURCES;
1962                 req->accept_param.initiator_depth        = OPENIBNAL_RESPONDER_RESOURCES;
1963                 req->accept_param.rnr_retry_count        = OPENIBNAL_RNR_RETRY;
1964                 req->accept_param.flow_control           = OPENIBNAL_FLOW_CONTROL;
1965
1966                 CDEBUG(D_NET, "Proceeding\n");
1967                 break;
1968         }
1969
1970         case TS_IB_CM_ESTABLISHED:
1971                 LASSERT (conn != NULL);
1972                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
1973                        conn, conn->ibc_peer->ibp_nid);
1974
1975                 koibnal_connreq_done (conn, 0, 0);
1976                 break;
1977         }
1978
1979         /* NB if the connreq is done, we switch to koibnal_conn_callback */
1980         return TS_IB_CM_CALLBACK_PROCEED;
1981 }
1982
1983 tTS_IB_CM_CALLBACK_RETURN
1984 koibnal_active_conn_callback (tTS_IB_CM_EVENT event,
1985                               tTS_IB_CM_COMM_ID cid,
1986                               void *param,
1987                               void *arg)
1988 {
1989         koib_conn_t *conn = arg;
1990
1991         switch (event) {
1992         case TS_IB_CM_REP_RECEIVED: {
1993                 struct ib_cm_rep_received_param *rep = param;
1994                 koib_wire_connreq_t             *wcr = rep->remote_private_data;
1995
1996                 if (rep->remote_private_data_len < sizeof (*wcr)) {
1997                         CERROR ("Short reply from "LPX64": %d\n",
1998                                 conn->ibc_peer->ibp_nid,
1999                                 rep->remote_private_data_len);
2000                         koibnal_connreq_done (conn, 1, -EPROTO);
2001                         break;
2002                 }
2003
2004                 if (wcr->wcr_magic != cpu_to_le32(OPENIBNAL_MSG_MAGIC)) {
2005                         CERROR ("Can't connect "LPX64": bad magic %08x\n",
2006                                 conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic));
2007                         koibnal_connreq_done (conn, 1, -EPROTO);
2008                         break;
2009                 }
2010                 
2011                 if (wcr->wcr_version != cpu_to_le16(OPENIBNAL_MSG_VERSION)) {
2012                         CERROR ("Can't connect "LPX64": bad version %d\n",
2013                                 conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic));
2014                         koibnal_connreq_done (conn, 1, -EPROTO);
2015                         break;
2016                 }
2017                                 
2018                 if (wcr->wcr_queue_depth != cpu_to_le16(OPENIBNAL_MSG_QUEUE_SIZE)) {
2019                         CERROR ("Can't connect "LPX64": bad queue depth %d\n",
2020                                 conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_queue_depth));
2021                         koibnal_connreq_done (conn, 1, -EPROTO);
2022                         break;
2023                 }
2024                                 
2025                 if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) {
2026                         CERROR ("Unexpected NID "LPX64" from "LPX64"\n",
2027                                 le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid);
2028                         koibnal_connreq_done (conn, 1, -EPROTO);
2029                         break;
2030                 }
2031
2032                 CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2033                        conn, conn->ibc_peer->ibp_nid);
2034
2035                 conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation);
2036                 conn->ibc_credits = OPENIBNAL_MSG_QUEUE_SIZE;
2037                 break;
2038         }
2039
2040         case TS_IB_CM_ESTABLISHED:
2041                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" Established\n",
2042                        conn, conn->ibc_peer->ibp_nid);
2043
2044                 koibnal_connreq_done (conn, 1, 0);
2045                 break;
2046
2047         case TS_IB_CM_IDLE:
2048                 CERROR("Connection %p -> "LPX64" IDLE\n",
2049                        conn, conn->ibc_peer->ibp_nid);
2050                 /* Back out state change: I'm disengaged from CM */
2051                 conn->ibc_state = OPENIBNAL_CONN_INIT_QP;
2052                 
2053                 koibnal_connreq_done (conn, 1, -ECONNABORTED);
2054                 break;
2055
2056         default:
2057                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
2058                        conn, conn->ibc_peer->ibp_nid, event);
2059                 koibnal_connreq_done (conn, 1, -ECONNABORTED);
2060                 break;
2061         }
2062
2063         /* NB if the connreq is done, we switch to koibnal_conn_callback */
2064         return TS_IB_CM_CALLBACK_PROCEED;
2065 }
2066
2067 int
2068 koibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2069                           struct ib_path_record *resp, int remaining,
2070                           void *arg)
2071 {
2072         koib_conn_t *conn = arg;
2073         
2074         if (status != 0) {
2075                 CERROR ("status %d\n", status);
2076                 koibnal_connreq_done (conn, 1, status);
2077                 goto out;
2078         }
2079
2080         conn->ibc_connreq->cr_path = *resp;
2081
2082         conn->ibc_connreq->cr_wcr = (koib_wire_connreq_t) {
2083                 .wcr_magic       = cpu_to_le32(OPENIBNAL_MSG_MAGIC),
2084                 .wcr_version     = cpu_to_le16(OPENIBNAL_MSG_VERSION),
2085                 .wcr_queue_depth = cpu_to_le16(OPENIBNAL_MSG_QUEUE_SIZE),
2086                 .wcr_nid         = cpu_to_le64(koibnal_data.koib_nid),
2087                 .wcr_incarnation = cpu_to_le64(koibnal_data.koib_incarnation),
2088         };
2089
2090         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2091                 .qp                   = conn->ibc_qp,
2092                 .req_private_data     = &conn->ibc_connreq->cr_wcr,
2093                 .req_private_data_len = sizeof(conn->ibc_connreq->cr_wcr),
2094                 .responder_resources  = OPENIBNAL_RESPONDER_RESOURCES,
2095                 .initiator_depth      = OPENIBNAL_RESPONDER_RESOURCES,
2096                 .retry_count          = OPENIBNAL_RETRY,
2097                 .rnr_retry_count      = OPENIBNAL_RNR_RETRY,
2098                 .cm_response_timeout  = koibnal_tunables.koib_io_timeout,
2099                 .max_cm_retries       = OPENIBNAL_CM_RETRY,
2100                 .flow_control         = OPENIBNAL_FLOW_CONTROL,
2101         };
2102
2103         /* XXX set timeout just like SDP!!!*/
2104         conn->ibc_connreq->cr_path.packet_life = 13;
2105         
2106         /* Flag I'm getting involved with the CM... */
2107         conn->ibc_state = OPENIBNAL_CONN_CONNECTING;
2108
2109         CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2110                conn->ibc_connreq->cr_service.service_id, 
2111                *koibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2112
2113         /* koibnal_connect_callback gets my conn ref */
2114         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2115                                 &conn->ibc_connreq->cr_path, NULL,
2116                                 conn->ibc_connreq->cr_service.service_id, 0,
2117                                 koibnal_active_conn_callback, conn,
2118                                 &conn->ibc_comm_id);
2119         if (status != 0) {
2120                 CERROR ("Connect: %d\n", status);
2121                 /* Back out state change: I've not got a CM comm_id yet... */
2122                 conn->ibc_state = OPENIBNAL_CONN_INIT_QP;
2123                 koibnal_connreq_done (conn, 1, status);
2124         }
2125         
2126  out:
2127         /* return non-zero to prevent further callbacks */
2128         return 1;
2129 }
2130
2131 void
2132 koibnal_service_get_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2133                               struct ib_common_attrib_service *resp, void *arg)
2134 {
2135         koib_conn_t *conn = arg;
2136         
2137         if (status != 0) {
2138                 CERROR ("status %d\n", status);
2139                 koibnal_connreq_done (conn, 1, status);
2140                 return;
2141         }
2142
2143         CDEBUG(D_NET, "Got status %d, service id "LPX64", on "LPX64"\n",
2144                status, resp->service_id, 
2145                *koibnal_service_nid_field(resp));
2146
2147         conn->ibc_connreq->cr_service = *resp;
2148
2149         status = ib_cached_gid_get(koibnal_data.koib_device,
2150                                    koibnal_data.koib_port, 0,
2151                                    conn->ibc_connreq->cr_gid);
2152         LASSERT (status == 0);
2153
2154         /* koibnal_pathreq_callback gets my conn ref */
2155         status = tsIbPathRecordRequest (koibnal_data.koib_device,
2156                                         koibnal_data.koib_port,
2157                                         conn->ibc_connreq->cr_gid,
2158                                         conn->ibc_connreq->cr_service.service_gid,
2159                                         conn->ibc_connreq->cr_service.service_pkey,
2160                                         0,
2161                                         koibnal_tunables.koib_io_timeout * HZ,
2162                                         0,
2163                                         koibnal_pathreq_callback, conn, 
2164                                         &conn->ibc_connreq->cr_tid);
2165
2166         if (status == 0)
2167                 return;
2168
2169         CERROR ("Path record request: %d\n", status);
2170         koibnal_connreq_done (conn, 1, status);
2171 }
2172
2173 void
2174 koibnal_connect_peer (koib_peer_t *peer)
2175 {
2176         koib_conn_t *conn = koibnal_create_conn();
2177         int          rc;
2178
2179         LASSERT (peer->ibp_connecting != 0);
2180
2181         if (conn == NULL) {
2182                 CERROR ("Can't allocate conn\n");
2183                 koibnal_peer_connect_failed (peer, 1, -ENOMEM);
2184                 return;
2185         }
2186
2187         conn->ibc_peer = peer;
2188         atomic_inc (&peer->ibp_refcount);
2189
2190         PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2191         if (conn->ibc_connreq == NULL) {
2192                 CERROR ("Can't allocate connreq\n");
2193                 koibnal_connreq_done (conn, 1, -ENOMEM);
2194                 return;
2195         }
2196
2197         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2198
2199         koibnal_set_service_keys(&conn->ibc_connreq->cr_service, peer->ibp_nid);
2200
2201         /* koibnal_service_get_callback gets my conn ref */
2202         rc = ib_service_get (koibnal_data.koib_device, 
2203                              koibnal_data.koib_port,
2204                              &conn->ibc_connreq->cr_service,
2205                              KOIBNAL_SERVICE_KEY_MASK,
2206                              koibnal_tunables.koib_io_timeout * HZ,
2207                              koibnal_service_get_callback, conn, 
2208                              &conn->ibc_connreq->cr_tid);
2209         
2210         if (rc == 0)
2211                 return;
2212
2213         CERROR ("ib_service_get: %d\n", rc);
2214         koibnal_connreq_done (conn, 1, rc);
2215 }
2216
2217 int
2218 koibnal_conn_timed_out (koib_conn_t *conn)
2219 {
2220         koib_tx_t         *tx;
2221         struct list_head  *ttmp;
2222         unsigned long      flags;
2223         int                rc = 0;
2224
2225         spin_lock_irqsave (&conn->ibc_lock, flags);
2226
2227         list_for_each (ttmp, &conn->ibc_rdma_queue) {
2228                 tx = list_entry (ttmp, koib_tx_t, tx_list);
2229
2230                 LASSERT (tx->tx_passive_rdma);
2231                 LASSERT (tx->tx_passive_rdma_wait);
2232
2233                 if (time_after_eq (jiffies, tx->tx_passive_rdma_deadline)) {
2234                         rc = 1;
2235                         break;
2236                 }
2237         }
2238         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2239
2240         return rc;
2241 }
2242
2243 void
2244 koibnal_check_conns (int idx)
2245 {
2246         struct list_head  *peers = &koibnal_data.koib_peers[idx];
2247         struct list_head  *ptmp;
2248         koib_peer_t       *peer;
2249         koib_conn_t       *conn;
2250         struct list_head  *ctmp;
2251
2252  again:
2253         /* NB. We expect to have a look at all the peers and not find any
2254          * rdmas to time out, so we just use a shared lock while we
2255          * take a look... */
2256         read_lock (&koibnal_data.koib_global_lock);
2257
2258         list_for_each (ptmp, peers) {
2259                 peer = list_entry (ptmp, koib_peer_t, ibp_list);
2260
2261                 list_for_each (ctmp, &peer->ibp_conns) {
2262                         conn = list_entry (ctmp, koib_conn_t, ibc_list);
2263
2264                         LASSERT (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED);
2265
2266                         /* In case we have enough credits to return via a
2267                          * NOOP, but there were no non-blocking tx descs
2268                          * free to do it last time... */
2269                         koibnal_check_sends(conn);
2270
2271                         if (!koibnal_conn_timed_out(conn))
2272                                 continue;
2273                         
2274                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2275                                conn, conn->ibc_state, peer->ibp_nid,
2276                                atomic_read (&conn->ibc_refcount));
2277
2278                         atomic_inc (&conn->ibc_refcount);
2279                         read_unlock (&koibnal_data.koib_global_lock);
2280
2281                         CERROR("Timed out RDMA with "LPX64"\n",
2282                                peer->ibp_nid);
2283
2284                         koibnal_close_conn (conn, -ETIMEDOUT);
2285                         koibnal_put_conn (conn);
2286
2287                         /* start again now I've dropped the lock */
2288                         goto again;
2289                 }
2290         }
2291
2292         read_unlock (&koibnal_data.koib_global_lock);
2293 }
2294
2295 void
2296 koibnal_terminate_conn (koib_conn_t *conn)
2297 {
2298         unsigned long flags;
2299         int           rc;
2300         int           done;
2301
2302         CDEBUG(D_NET, "conn %p\n", conn);
2303         LASSERT (conn->ibc_state == OPENIBNAL_CONN_DEATHROW);
2304         conn->ibc_state = OPENIBNAL_CONN_ZOMBIE;
2305
2306         rc = ib_cm_disconnect (conn->ibc_comm_id);
2307         if (rc != 0)
2308                 CERROR ("Error %d disconnecting conn %p -> "LPX64"\n",
2309                         rc, conn, conn->ibc_peer->ibp_nid);
2310
2311         /* complete blocked passive RDMAs */
2312         spin_lock_irqsave (&conn->ibc_lock, flags);
2313         
2314         while (!list_empty (&conn->ibc_rdma_queue)) {
2315                 koib_tx_t *tx = list_entry (conn->ibc_rdma_queue.next,
2316                                             koib_tx_t, tx_list);
2317
2318                 LASSERT (tx->tx_passive_rdma);
2319                 LASSERT (tx->tx_passive_rdma_wait);
2320                 
2321                 list_del (&tx->tx_list);
2322
2323                 tx->tx_passive_rdma_wait = 0;
2324                 done = (tx->tx_sending == 0);
2325                 
2326                 tx->tx_status = -ECONNABORTED;
2327
2328                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2329
2330                 if (done)
2331                         koibnal_tx_done (tx);
2332
2333                 spin_lock_irqsave (&conn->ibc_lock, flags);
2334         }
2335         
2336         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2337
2338         /* Complete all blocked transmits */
2339         koibnal_check_sends(conn);
2340 }
2341
2342 int
2343 koibnal_connd (void *arg)
2344 {
2345         wait_queue_t       wait;
2346         unsigned long      flags;
2347         koib_conn_t       *conn;
2348         koib_peer_t       *peer;
2349         int                timeout;
2350         int                i;
2351         int                peer_index = 0;
2352         unsigned long      deadline = jiffies;
2353         
2354         kportal_daemonize ("koibnal_connd");
2355         kportal_blockallsigs ();
2356
2357         init_waitqueue_entry (&wait, current);
2358
2359         spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2360
2361         for (;;) {
2362                 if (!list_empty (&koibnal_data.koib_connd_conns)) {
2363                         conn = list_entry (koibnal_data.koib_connd_conns.next,
2364                                            koib_conn_t, ibc_list);
2365                         list_del (&conn->ibc_list);
2366                         
2367                         spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2368
2369                         switch (conn->ibc_state) {
2370                         case OPENIBNAL_CONN_DEATHROW:
2371                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2372                                 /* Disconnect: conn becomes a zombie in the
2373                                  * callback and last ref reschedules it
2374                                  * here... */
2375                                 koibnal_terminate_conn(conn);
2376                                 koibnal_put_conn (conn);
2377                                 break;
2378                                 
2379                         case OPENIBNAL_CONN_ZOMBIE:
2380                                 koibnal_destroy_conn (conn);
2381                                 break;
2382                                 
2383                         default:
2384                                 CERROR ("Bad conn %p state: %d\n",
2385                                         conn, conn->ibc_state);
2386                                 LBUG();
2387                         }
2388
2389                         spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2390                         continue;
2391                 }
2392
2393                 if (!list_empty (&koibnal_data.koib_connd_peers)) {
2394                         peer = list_entry (koibnal_data.koib_connd_peers.next,
2395                                            koib_peer_t, ibp_connd_list);
2396                         
2397                         list_del_init (&peer->ibp_connd_list);
2398                         spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2399
2400                         koibnal_connect_peer (peer);
2401                         koibnal_put_peer (peer);
2402
2403                         spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2404                 }
2405
2406                 /* shut down and nobody left to reap... */
2407                 if (koibnal_data.koib_shutdown &&
2408                     atomic_read(&koibnal_data.koib_nconns) == 0)
2409                         break;
2410
2411                 spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2412
2413                 /* careful with the jiffy wrap... */
2414                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2415                         const int n = 4;
2416                         const int p = 1;
2417                         int       chunk = koibnal_data.koib_peer_hash_size;
2418                         
2419                         /* Time to check for RDMA timeouts on a few more
2420                          * peers: I do checks every 'p' seconds on a
2421                          * proportion of the peer table and I need to check
2422                          * every connection 'n' times within a timeout
2423                          * interval, to ensure I detect a timeout on any
2424                          * connection within (n+1)/n times the timeout
2425                          * interval. */
2426
2427                         if (koibnal_tunables.koib_io_timeout > n * p)
2428                                 chunk = (chunk * n * p) / 
2429                                         koibnal_tunables.koib_io_timeout;
2430                         if (chunk == 0)
2431                                 chunk = 1;
2432
2433                         for (i = 0; i < chunk; i++) {
2434                                 koibnal_check_conns (peer_index);
2435                                 peer_index = (peer_index + 1) % 
2436                                              koibnal_data.koib_peer_hash_size;
2437                         }
2438
2439                         deadline += p * HZ;
2440                 }
2441
2442                 koibnal_data.koib_connd_waketime = jiffies + timeout;
2443
2444                 set_current_state (TASK_INTERRUPTIBLE);
2445                 add_wait_queue (&koibnal_data.koib_connd_waitq, &wait);
2446
2447                 if (!koibnal_data.koib_shutdown &&
2448                     list_empty (&koibnal_data.koib_connd_conns) &&
2449                     list_empty (&koibnal_data.koib_connd_peers))
2450                         schedule_timeout (timeout);
2451
2452                 set_current_state (TASK_RUNNING);
2453                 remove_wait_queue (&koibnal_data.koib_connd_waitq, &wait);
2454
2455                 spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2456         }
2457
2458         spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2459
2460         koibnal_thread_fini ();
2461         return (0);
2462 }
2463
2464 int
2465 koibnal_scheduler(void *arg)
2466 {
2467         long            id = (long)arg;
2468         char            name[16];
2469         koib_rx_t      *rx;
2470         koib_tx_t      *tx;
2471         unsigned long   flags;
2472         int             rc;
2473         int             counter = 0;
2474         int             did_something;
2475
2476         snprintf(name, sizeof(name), "koibnal_sd_%02ld", id);
2477         kportal_daemonize(name);
2478         kportal_blockallsigs();
2479
2480         spin_lock_irqsave(&koibnal_data.koib_sched_lock, flags);
2481
2482         for (;;) {
2483                 did_something = 0;
2484
2485                 while (!list_empty(&koibnal_data.koib_sched_txq)) {
2486                         tx = list_entry(koibnal_data.koib_sched_txq.next,
2487                                         koib_tx_t, tx_list);
2488                         list_del(&tx->tx_list);
2489                         spin_unlock_irqrestore(&koibnal_data.koib_sched_lock,
2490                                                flags);
2491                         koibnal_tx_done(tx);
2492
2493                         spin_lock_irqsave(&koibnal_data.koib_sched_lock,
2494                                           flags);
2495                 }
2496
2497                 if (!list_empty(&koibnal_data.koib_sched_rxq)) {
2498                         rx = list_entry(koibnal_data.koib_sched_rxq.next,
2499                                         koib_rx_t, rx_list);
2500                         list_del(&rx->rx_list);
2501                         spin_unlock_irqrestore(&koibnal_data.koib_sched_lock,
2502                                                flags);
2503
2504                         koibnal_rx(rx);
2505
2506                         did_something = 1;
2507                         spin_lock_irqsave(&koibnal_data.koib_sched_lock,
2508                                           flags);
2509                 }
2510
2511                 /* shut down and no receives to complete... */
2512                 if (koibnal_data.koib_shutdown &&
2513                     atomic_read(&koibnal_data.koib_nconns) == 0)
2514                         break;
2515
2516                 /* nothing to do or hogging CPU */
2517                 if (!did_something || counter++ == OPENIBNAL_RESCHED) {
2518                         spin_unlock_irqrestore(&koibnal_data.koib_sched_lock,
2519                                                flags);
2520                         counter = 0;
2521
2522                         if (!did_something) {
2523                                 rc = wait_event_interruptible(
2524                                         koibnal_data.koib_sched_waitq,
2525                                         !list_empty(&koibnal_data.koib_sched_txq) || 
2526                                         !list_empty(&koibnal_data.koib_sched_rxq) || 
2527                                         (koibnal_data.koib_shutdown &&
2528                                          atomic_read (&koibnal_data.koib_nconns) == 0));
2529                         } else {
2530                                 our_cond_resched();
2531                         }
2532
2533                         spin_lock_irqsave(&koibnal_data.koib_sched_lock,
2534                                           flags);
2535                 }
2536         }
2537
2538         spin_unlock_irqrestore(&koibnal_data.koib_sched_lock, flags);
2539
2540         koibnal_thread_fini();
2541         return (0);
2542 }
2543
2544
2545 lib_nal_t koibnal_lib = {
2546         libnal_data:        &koibnal_data,      /* NAL private data */
2547         libnal_send:         koibnal_send,
2548         libnal_send_pages:   koibnal_send_pages,
2549         libnal_recv:         koibnal_recv,
2550         libnal_recv_pages:   koibnal_recv_pages,
2551         libnal_dist:         koibnal_dist
2552 };