Whamcloud - gitweb
7e56807276a76a85e8c333277c2a9098cb1597dd
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/openiblnd/openiblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "openiblnd.h"
42
43 /*
44  *  LIB functions follow
45  *
46  */
47 void
48 kibnal_schedule_tx_done (kib_tx_t *tx)
49 {
50         unsigned long flags;
51
52         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
53
54         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
55         wake_up (&kibnal_data.kib_sched_waitq);
56
57         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
58 }
59
60 void
61 kibnal_tx_done (kib_tx_t *tx)
62 {
63         lnet_msg_t      *lntmsg[2];
64         unsigned long    flags;
65         int              i;
66         int              rc;
67
68         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
69         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
70
71         if (in_interrupt()) {
72                 /* can't deregister memory/flush FMAs/finalize in IRQ context... */
73                 kibnal_schedule_tx_done(tx);
74                 return;
75         }
76
77         switch (tx->tx_mapped) {
78         default:
79                 LBUG();
80
81         case KIB_TX_UNMAPPED:
82                 break;
83                 
84         case KIB_TX_MAPPED:
85                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
86                 LASSERT (rc == 0);
87                 tx->tx_mapped = KIB_TX_UNMAPPED;
88                 break;
89
90 #if IBNAL_FMR
91         case KIB_TX_MAPPED_FMR:
92                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
93                 LASSERT (rc == 0);
94
95 #ifndef USING_TSAPI
96                 /* Somewhat belt-and-braces since the tx's conn has closed if
97                  * this was a passive RDMA waiting to complete... */
98                 if (tx->tx_status != 0)
99                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
100 #endif
101                 tx->tx_mapped = KIB_TX_UNMAPPED;
102                 break;
103 #endif
104         }
105
106         /* tx may have up to 2 ptlmsgs to finalise */
107         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
108         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
109         rc = tx->tx_status;
110
111         if (tx->tx_conn != NULL) {
112                 kibnal_conn_decref(tx->tx_conn);
113                 tx->tx_conn = NULL;
114         }
115
116         tx->tx_nsp = 0;
117         tx->tx_passive_rdma = 0;
118         tx->tx_status = 0;
119
120         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
121
122         list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
123
124         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
125
126         /* delay finalize until my descs have been freed */
127         for (i = 0; i < 2; i++) {
128                 if (lntmsg[i] == NULL)
129                         continue;
130
131                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
132         }
133 }
134
135 kib_tx_t *
136 kibnal_get_idle_tx (void) 
137 {
138         unsigned long  flags;
139         kib_tx_t      *tx;
140         
141         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
142
143         if (list_empty (&kibnal_data.kib_idle_txs)) {
144                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
145                 return NULL;
146         }
147
148         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
149         list_del (&tx->tx_list);
150
151         /* Allocate a new passive RDMA completion cookie.  It might not be
152          * needed, but we've got a lock right now and we're unlikely to
153          * wrap... */
154         tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
155
156         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
157
158         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
159         LASSERT (tx->tx_nsp == 0);
160         LASSERT (tx->tx_sending == 0);
161         LASSERT (tx->tx_status == 0);
162         LASSERT (tx->tx_conn == NULL);
163         LASSERT (!tx->tx_passive_rdma);
164         LASSERT (!tx->tx_passive_rdma_wait);
165         LASSERT (tx->tx_lntmsg[0] == NULL);
166         LASSERT (tx->tx_lntmsg[1] == NULL);
167
168         return tx;
169 }
170
171 void
172 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
173 {
174         struct list_head *ttmp;
175         unsigned long     flags;
176         int               idle;
177
178         spin_lock_irqsave (&conn->ibc_lock, flags);
179
180         list_for_each (ttmp, &conn->ibc_active_txs) {
181                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
182
183                 LASSERT (tx->tx_passive_rdma ||
184                          !tx->tx_passive_rdma_wait);
185
186                 LASSERT (tx->tx_passive_rdma_wait ||
187                          tx->tx_sending != 0);
188
189                 if (!tx->tx_passive_rdma_wait ||
190                     tx->tx_passive_rdma_cookie != cookie)
191                         continue;
192
193                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
194
195                 /* XXX Set mlength of reply here */
196
197                 tx->tx_status = status;
198                 tx->tx_passive_rdma_wait = 0;
199                 idle = (tx->tx_sending == 0);
200
201                 if (idle)
202                         list_del (&tx->tx_list);
203
204                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
205
206                 /* I could be racing with tx callbacks.  It's whoever
207                  * _makes_ tx idle that frees it */
208                 if (idle)
209                         kibnal_tx_done (tx);
210                 return;
211         }
212                 
213         spin_unlock_irqrestore (&conn->ibc_lock, flags);
214
215         CERROR ("Unmatched (late?) RDMA completion "LPX64" from %s\n",
216                 cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
217 }
218
219 void
220 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
221 {
222         kib_conn_t   *conn = rx->rx_conn;
223         int           rc;
224         unsigned long flags;
225
226         LASSERT(!rsrvd_credit ||
227                 conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
228
229         rx->rx_gl = (struct ib_gather_scatter) {
230                 .address = rx->rx_vaddr,
231                 .length  = IBNAL_MSG_SIZE,
232                 .key     = conn->ibc_rx_pages->ibp_lkey,
233         };
234
235         rx->rx_sp = (struct ib_receive_param) {
236                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
237                 .scatter_list           = &rx->rx_gl,
238                 .num_scatter_entries    = 1,
239                 .device_specific        = NULL,
240                 .signaled               = 1,
241         };
242
243         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
244         LASSERT (rx->rx_nob >= 0);              /* not posted */
245         rx->rx_nob = -1;                        /* is now */
246         mb();
247
248         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
249                 rc = -ECONNABORTED;
250         else
251                 rc = kibnal_ib_receive(conn->ibc_qp, &rx->rx_sp);
252
253         if (rc == 0) {
254                 if (credit || rsrvd_credit) {
255                         spin_lock_irqsave(&conn->ibc_lock, flags);
256
257                         if (credit)
258                                 conn->ibc_outstanding_credits++;
259                         if (rsrvd_credit)
260                                 conn->ibc_reserved_credits++;
261                         
262                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
263
264                         kibnal_check_sends(conn);
265                 }
266                 return;
267         }
268
269         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
270                 CERROR ("Error posting receive -> %s: %d\n",
271                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
272                 kibnal_close_conn (rx->rx_conn, rc);
273         } else {
274                 CDEBUG (D_NET, "Error posting receive -> %s: %d\n",
275                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
276         }
277
278         /* Drop rx's ref */
279         kibnal_conn_decref(conn);
280 }
281
282 void
283 kibnal_rx_callback (struct ib_cq_entry *e)
284 {
285         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
286         kib_msg_t    *msg = rx->rx_msg;
287         kib_conn_t   *conn = rx->rx_conn;
288         int           credits;
289         unsigned long flags;
290         int           rc;
291         int           err = -ECONNABORTED;
292
293         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
294         LASSERT (rx->rx_nob < 0);               /* was posted */
295         rx->rx_nob = 0;                         /* isn't now */
296         mb();
297
298         /* receives complete with error in any case after we've started
299          * closing the QP */
300         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
301                 goto failed;
302
303         /* We don't post receives until the conn is established */
304         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
305
306         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
307                 CERROR("Rx from %s failed: %d\n", 
308                        libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
309                 goto failed;
310         }
311
312         LASSERT (e->bytes_transferred >= 0);
313         rx->rx_nob = e->bytes_transferred;
314         mb();
315
316         rc = kibnal_unpack_msg(msg, conn->ibc_version, rx->rx_nob);
317         if (rc != 0) {
318                 CERROR ("Error %d unpacking rx from %s\n",
319                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
320                 goto failed;
321         }
322
323         if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
324                                      msg->ibm_srcnid) ||
325             !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
326                                      msg->ibm_dstnid) ||
327             msg->ibm_srcstamp != conn->ibc_incarnation ||
328             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
329                 CERROR ("Stale rx from %s\n",
330                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
331                 err = -ESTALE;
332                 goto failed;
333         }
334
335         /* Have I received credits that will let me send? */
336         credits = msg->ibm_credits;
337         if (credits != 0) {
338                 spin_lock_irqsave(&conn->ibc_lock, flags);
339                 conn->ibc_credits += credits;
340                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
341                 
342                 kibnal_check_sends(conn);
343         }
344
345         switch (msg->ibm_type) {
346         case IBNAL_MSG_NOOP:
347                 kibnal_post_rx (rx, 1, 0);
348                 return;
349
350         case IBNAL_MSG_IMMEDIATE:
351                 break;
352                 
353         case IBNAL_MSG_PUT_RDMA:
354         case IBNAL_MSG_GET_RDMA:
355                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
356                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
357                        msg->ibm_u.rdma.ibrm_desc.rd_key,
358                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
359                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
360                 break;
361                 
362         case IBNAL_MSG_PUT_DONE:
363         case IBNAL_MSG_GET_DONE:
364                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
365                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
366                        msg->ibm_u.completion.ibcm_status);
367
368                 kibnal_complete_passive_rdma (conn, 
369                                               msg->ibm_u.completion.ibcm_cookie,
370                                               msg->ibm_u.completion.ibcm_status);
371
372                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
373                         kibnal_post_rx (rx, 1, 0);
374                 } else {
375                         /* this reply buffer was pre-reserved */
376                         kibnal_post_rx (rx, 0, 1);
377                 }
378                 return;
379                         
380         default:
381                 CERROR ("Bad msg type %x from %s\n",
382                         msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
383                 goto failed;
384         }
385
386         kibnal_peer_alive(conn->ibc_peer);
387
388         /* schedule for kibnal_rx() in thread context */
389         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
390         
391         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
392         wake_up (&kibnal_data.kib_sched_waitq);
393         
394         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
395         return;
396         
397  failed:
398         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
399         kibnal_close_conn(conn, err);
400
401         /* Don't re-post rx & drop its ref on conn */
402         kibnal_conn_decref(conn);
403 }
404
405 void
406 kibnal_rx (kib_rx_t *rx)
407 {
408         int          rc = 0;
409         kib_msg_t   *msg = rx->rx_msg;
410
411         switch (msg->ibm_type) {
412         case IBNAL_MSG_GET_RDMA:
413                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
414                                 msg->ibm_srcnid, rx, 1);
415                 break;
416                 
417         case IBNAL_MSG_PUT_RDMA:
418                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
419                                 msg->ibm_srcnid, rx, 1);
420                 break;
421
422         case IBNAL_MSG_IMMEDIATE:
423                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
424                                 msg->ibm_srcnid, rx, 0);
425                 break;
426
427         default:
428                 LBUG();
429                 break;
430         }
431
432         if (rc < 0) {
433                 kibnal_close_conn(rx->rx_conn, rc);
434                 kibnal_post_rx (rx, 1, 0);
435         }
436 }
437
438 #if 0
439 int
440 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
441 {
442         struct page *page;
443
444         if (vaddr >= VMALLOC_START &&
445             vaddr < VMALLOC_END)
446                 page = vmalloc_to_page ((void *)vaddr);
447 #ifdef CONFIG_HIGHMEM
448         else if (vaddr >= PKMAP_BASE &&
449                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
450                 page = vmalloc_to_page ((void *)vaddr);
451         /* in 2.4 ^ just walks the page tables */
452 #endif
453         else
454                 page = virt_to_page (vaddr);
455
456         if (page == NULL ||
457             !VALID_PAGE (page))
458                 return (-EFAULT);
459
460         *physp = lnet_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
461         return (0);
462 }
463 #endif
464
465 int
466 kibnal_map_iov (kib_tx_t *tx, int access,
467                 unsigned int niov, struct iovec *iov, int offset, int nob)
468                  
469 {
470         void   *vaddr;
471         int     rc;
472
473         LASSERT (nob > 0);
474         LASSERT (niov > 0);
475         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
476
477         while (offset >= iov->iov_len) {
478                 offset -= iov->iov_len;
479                 niov--;
480                 iov++;
481                 LASSERT (niov > 0);
482         }
483
484         if (nob > iov->iov_len - offset) {
485                 CERROR ("Can't map multiple vaddr fragments\n");
486                 return (-EMSGSIZE);
487         }
488
489         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
490         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
491
492         rc = ib_memory_register (kibnal_data.kib_pd,
493                                  vaddr, nob,
494                                  access,
495                                  &tx->tx_md.md_handle.mr,
496                                  &tx->tx_md.md_lkey,
497                                  &tx->tx_md.md_rkey);
498         
499         if (rc != 0) {
500                 CERROR ("Can't map vaddr: %d\n", rc);
501                 return (rc);
502         }
503
504         tx->tx_mapped = KIB_TX_MAPPED;
505         return (0);
506 }
507
508 int
509 kibnal_map_kiov (kib_tx_t *tx, int access,
510                   int nkiov, lnet_kiov_t *kiov,
511                   int offset, int nob)
512 {
513 #if IBNAL_FMR
514         __u64                      *phys;
515         const int                   mapped = KIB_TX_MAPPED_FMR;
516 #else
517         struct ib_physical_buffer  *phys;
518         const int                   mapped = KIB_TX_MAPPED;
519 #endif
520         int                         page_offset;
521         int                         nphys;
522         int                         resid;
523         int                         phys_size;
524         int                         rc;
525
526         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
527
528         LASSERT (nob > 0);
529         LASSERT (nkiov > 0);
530         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
531
532         while (offset >= kiov->kiov_len) {
533                 offset -= kiov->kiov_len;
534                 nkiov--;
535                 kiov++;
536                 LASSERT (nkiov > 0);
537         }
538
539         phys_size = nkiov * sizeof (*phys);
540         LIBCFS_ALLOC(phys, phys_size);
541         if (phys == NULL) {
542                 CERROR ("Can't allocate tmp phys\n");
543                 return (-ENOMEM);
544         }
545
546         page_offset = kiov->kiov_offset + offset;
547 #if IBNAL_FMR
548         phys[0] = lnet_page2phys(kiov->kiov_page);
549 #else
550         phys[0].address = lnet_page2phys(kiov->kiov_page);
551         phys[0].size = PAGE_SIZE;
552 #endif
553         nphys = 1;
554         resid = nob - (kiov->kiov_len - offset);
555
556         while (resid > 0) {
557                 kiov++;
558                 nkiov--;
559                 LASSERT (nkiov > 0);
560
561                 if (kiov->kiov_offset != 0 ||
562                     ((resid > PAGE_SIZE) && 
563                      kiov->kiov_len < PAGE_SIZE)) {
564                         int i;
565                         /* Can't have gaps */
566                         CERROR ("Can't make payload contiguous in I/O VM:"
567                                 "page %d, offset %d, len %d \n", nphys, 
568                                 kiov->kiov_offset, kiov->kiov_len);
569
570                         for (i = -nphys; i < nkiov; i++) 
571                         {
572                                 CERROR("kiov[%d] %p +%d for %d\n",
573                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
574                         }
575                         
576                         rc = -EINVAL;
577                         goto out;
578                 }
579
580                 if (nphys == LNET_MAX_IOV) {
581                         CERROR ("payload too big (%d)\n", nphys);
582                         rc = -EMSGSIZE;
583                         goto out;
584                 }
585
586                 LASSERT (nphys * sizeof (*phys) < phys_size);
587 #if IBNAL_FMR
588                 phys[nphys] = lnet_page2phys(kiov->kiov_page);
589 #else
590                 phys[nphys].address = lnet_page2phys(kiov->kiov_page);
591                 phys[nphys].size = PAGE_SIZE;
592 #endif
593                 nphys++;
594
595                 resid -= PAGE_SIZE;
596         }
597
598         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
599
600 #if IBNAL_FMR
601         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
602                                        phys, nphys,
603                                        &tx->tx_md.md_addr,
604                                        page_offset,
605                                        &tx->tx_md.md_handle.fmr,
606                                        &tx->tx_md.md_lkey,
607                                        &tx->tx_md.md_rkey);
608 #else
609         rc = ib_memory_register_physical (kibnal_data.kib_pd,
610                                           phys, nphys,
611                                           &tx->tx_md.md_addr,
612                                           nob, page_offset,
613                                           access,
614                                           &tx->tx_md.md_handle.mr,
615                                           &tx->tx_md.md_lkey,
616                                           &tx->tx_md.md_rkey);
617 #endif
618         if (rc == 0) {
619                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
620                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
621                 tx->tx_mapped = mapped;
622         } else {
623                 CERROR ("Can't map phys: %d\n", rc);
624                 rc = -EFAULT;
625         }
626
627  out:
628         LIBCFS_FREE(phys, phys_size);
629         return (rc);
630 }
631
632 kib_conn_t *
633 kibnal_find_conn_locked (kib_peer_t *peer)
634 {
635         struct list_head *tmp;
636
637         /* just return the first connection */
638         list_for_each (tmp, &peer->ibp_conns) {
639                 return (list_entry(tmp, kib_conn_t, ibc_list));
640         }
641
642         return (NULL);
643 }
644
645 void
646 kibnal_check_sends (kib_conn_t *conn)
647 {
648         unsigned long   flags;
649         kib_tx_t       *tx;
650         int             rc;
651         int             i;
652         int             consume_credit;
653         int             done;
654         int             nwork;
655
656         spin_lock_irqsave (&conn->ibc_lock, flags);
657
658         LASSERT (conn->ibc_nsends_posted <= IBNAL_RX_MSGS);
659         LASSERT (conn->ibc_reserved_credits >= 0);
660
661         while (conn->ibc_reserved_credits > 0 &&
662                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
663                 LASSERT (conn->ibc_version !=
664                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
665                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
666                                 kib_tx_t, tx_list);
667                 list_del(&tx->tx_list);
668                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
669                 conn->ibc_reserved_credits--;
670         }
671
672         if (list_empty(&conn->ibc_tx_queue) &&
673             list_empty(&conn->ibc_tx_queue_nocred) &&
674             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
675              kibnal_send_keepalive(conn))) {
676                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
677                 
678                 tx = kibnal_get_idle_tx();
679                 if (tx != NULL)
680                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
681
682                 spin_lock_irqsave(&conn->ibc_lock, flags);
683                 
684                 if (tx != NULL)
685                         kibnal_queue_tx_locked(tx, conn);
686         }
687
688         for (;;) {
689                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
690                         LASSERT (conn->ibc_version !=
691                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
692                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
693                                         kib_tx_t, tx_list);
694                         consume_credit = 0;
695                 } else if (!list_empty (&conn->ibc_tx_queue)) {
696                         tx = list_entry (conn->ibc_tx_queue.next, 
697                                          kib_tx_t, tx_list);
698                         consume_credit = 1;
699                 } else {
700                         /* nothing waiting */
701                         break;
702                 }
703
704                 /* We rely on this for QP sizing */
705                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
706
707                 LASSERT (conn->ibc_outstanding_credits >= 0);
708                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
709                 LASSERT (conn->ibc_credits >= 0);
710                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
711
712                 /* Not on ibc_rdma_queue */
713                 LASSERT (!tx->tx_passive_rdma_wait);
714
715                 if (conn->ibc_nsends_posted == IBNAL_RX_MSGS)
716                         break;
717
718                 if (consume_credit) {
719                         if (conn->ibc_credits == 0)     /* no credits */
720                                 break;
721                 
722                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
723                             conn->ibc_outstanding_credits == 0) /* giving back credits */
724                                 break;
725                 }
726                 
727                 list_del (&tx->tx_list);
728
729                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
730                     (!list_empty(&conn->ibc_tx_queue) ||
731                      !list_empty(&conn->ibc_tx_queue_nocred) ||
732                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
733                       !kibnal_send_keepalive(conn)))) {
734                         /* redundant NOOP */
735                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
736                         kibnal_tx_done(tx);
737                         spin_lock_irqsave(&conn->ibc_lock, flags);
738                         continue;
739                 }
740
741                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
742                                 conn->ibc_outstanding_credits,
743                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
744
745                 conn->ibc_outstanding_credits = 0;
746                 conn->ibc_nsends_posted++;
747                 if (consume_credit)
748                         conn->ibc_credits--;
749
750                 tx->tx_sending = tx->tx_nsp;
751                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
752                 list_add (&tx->tx_list, &conn->ibc_active_txs);
753
754                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
755
756                 /* NB the gap between removing tx from the queue and sending it
757                  * allows message re-ordering to occur */
758
759                 LASSERT (tx->tx_nsp > 0);
760
761                 rc = -ECONNABORTED;
762                 nwork = 0;
763                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
764                         tx->tx_status = 0;
765                         /* Driver only accepts 1 item at a time */
766                         for (i = 0; i < tx->tx_nsp; i++) {
767                                 rc = kibnal_ib_send(conn->ibc_qp, &tx->tx_sp[i]);
768                                 if (rc != 0)
769                                         break;
770                                 nwork++;
771                         }
772                 }
773
774                 conn->ibc_last_send = jiffies;
775
776                 spin_lock_irqsave (&conn->ibc_lock, flags);
777                 if (rc != 0) {
778                         /* NB credits are transferred in the actual
779                          * message, which can only be the last work item */
780                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
781                         if (consume_credit)
782                                 conn->ibc_credits++;
783                         conn->ibc_nsends_posted--;
784
785                         tx->tx_status = rc;
786                         tx->tx_passive_rdma_wait = 0;
787                         tx->tx_sending -= tx->tx_nsp - nwork;
788
789                         done = (tx->tx_sending == 0);
790                         if (done)
791                                 list_del (&tx->tx_list);
792                         
793                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
794                         
795                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
796                                 CERROR ("Error %d posting transmit to %s\n", 
797                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
798                         else
799                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
800                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
801
802                         kibnal_close_conn (conn, rc);
803
804                         if (done)
805                                 kibnal_tx_done (tx);
806                         return;
807                 }
808                 
809         }
810
811         spin_unlock_irqrestore (&conn->ibc_lock, flags);
812 }
813
814 void
815 kibnal_tx_callback (struct ib_cq_entry *e)
816 {
817         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
818         kib_conn_t   *conn;
819         unsigned long flags;
820         int           idle;
821
822         conn = tx->tx_conn;
823         LASSERT (conn != NULL);
824         LASSERT (tx->tx_sending != 0);
825
826         spin_lock_irqsave(&conn->ibc_lock, flags);
827
828         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
829                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
830                e->status);
831
832         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
833          * gets to free it, which also drops its ref on 'conn'.  If it's
834          * not me, then I take an extra ref on conn so it can't disappear
835          * under me. */
836
837         tx->tx_sending--;
838         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
839                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
840         if (idle)
841                 list_del(&tx->tx_list);
842
843         kibnal_conn_addref(conn);
844
845         if (tx->tx_sending == 0)
846                 conn->ibc_nsends_posted--;
847
848         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
849             tx->tx_status == 0)
850                 tx->tx_status = -ECONNABORTED;
851                 
852         spin_unlock_irqrestore(&conn->ibc_lock, flags);
853
854         if (idle)
855                 kibnal_tx_done (tx);
856
857         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
858                 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", 
859                         libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
860                 kibnal_close_conn (conn, -ENETDOWN);
861         } else {
862                 kibnal_peer_alive(conn->ibc_peer);
863                 /* can I shovel some more sends out the door? */
864                 kibnal_check_sends(conn);
865         }
866
867         kibnal_conn_decref(conn);
868 }
869
870 void
871 kibnal_callback (ib_cq_t *cq, struct ib_cq_entry *e, void *arg)
872 {
873         if (kibnal_wreqid_is_rx(e->work_request_id))
874                 kibnal_rx_callback (e);
875         else
876                 kibnal_tx_callback (e);
877 }
878
879 void
880 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
881 {
882         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
883         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
884         int                       fence;
885         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
886
887         LASSERT (tx->tx_nsp >= 0 && 
888                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
889         LASSERT (nob <= IBNAL_MSG_SIZE);
890
891         kibnal_init_msg(tx->tx_msg, type, body_nob);
892
893         /* Fence the message if it's bundled with an RDMA read */
894         fence = (tx->tx_nsp > 0) &&
895                 (type == IBNAL_MSG_PUT_DONE);
896
897         *gl = (struct ib_gather_scatter) {
898                 .address = tx->tx_vaddr,
899                 .length  = nob,
900                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
901         };
902
903         /* NB If this is an RDMA read, the completion message must wait for
904          * the RDMA to complete.  Sends wait for previous RDMA writes
905          * anyway... */
906         *sp = (struct ib_send_param) {
907                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
908                 .op                   = IB_OP_SEND,
909                 .gather_list          = gl,
910                 .num_gather_entries   = 1,
911                 .device_specific      = NULL,
912                 .solicited_event      = 1,
913                 .signaled             = 1,
914                 .immediate_data_valid = 0,
915                 .fence                = fence,
916                 .inline_data          = 0,
917         };
918
919         tx->tx_nsp++;
920 }
921
922 void
923 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
924 {
925         unsigned long         flags;
926
927         spin_lock_irqsave(&conn->ibc_lock, flags);
928
929         kibnal_queue_tx_locked (tx, conn);
930         
931         spin_unlock_irqrestore(&conn->ibc_lock, flags);
932         
933         kibnal_check_sends(conn);
934 }
935
936 void
937 kibnal_schedule_active_connect_locked (kib_peer_t *peer)
938 {
939         /* Called with exclusive kib_global_lock */
940
941         peer->ibp_connecting++;
942         kibnal_peer_addref(peer); /* extra ref for connd */
943         
944         spin_lock (&kibnal_data.kib_connd_lock);
945         
946         LASSERT (list_empty(&peer->ibp_connd_list));
947         list_add_tail (&peer->ibp_connd_list,
948                        &kibnal_data.kib_connd_peers);
949         wake_up (&kibnal_data.kib_connd_waitq);
950         
951         spin_unlock (&kibnal_data.kib_connd_lock);
952 }
953
954 void
955 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
956 {
957         unsigned long    flags;
958         kib_peer_t      *peer;
959         kib_conn_t      *conn;
960         int              retry;
961         int              rc;
962         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
963
964         /* If I get here, I've committed to send, so I complete the tx with
965          * failure on any problems */
966         
967         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
968         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
969
970         for (retry = 0; ; retry = 1) {
971                 read_lock_irqsave(g_lock, flags);
972         
973                 peer = kibnal_find_peer_locked (nid);
974                 if (peer != NULL) {
975                         conn = kibnal_find_conn_locked (peer);
976                         if (conn != NULL) {
977                                 kibnal_conn_addref(conn); /* 1 ref for me...*/
978                                 read_unlock_irqrestore(g_lock, flags);
979                 
980                                 kibnal_queue_tx (tx, conn);
981                                 kibnal_conn_decref(conn); /* ...until here */
982                                 return;
983                         }
984                 }
985                 
986                 /* Making one or more connections; I'll need a write lock... */
987                 read_unlock(g_lock);
988                 write_lock(g_lock);
989
990                 peer = kibnal_find_peer_locked (nid);
991                 if (peer != NULL)
992                         break;
993                 
994                 write_unlock_irqrestore (g_lock, flags);
995
996                 if (retry) {
997                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
998                         tx->tx_status = -EHOSTUNREACH;
999                         kibnal_tx_done (tx);
1000                         return;
1001                 }
1002
1003                 rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid),
1004                                                 lnet_acceptor_port());
1005                 if (rc != 0) {
1006                         CERROR("Can't add peer %s: %d\n",
1007                                libcfs_nid2str(nid), rc);
1008                         tx->tx_status = rc;
1009                         kibnal_tx_done(tx);
1010                         return;
1011                 }
1012         }
1013
1014         conn = kibnal_find_conn_locked (peer);
1015         if (conn != NULL) {
1016                 /* Connection exists; queue message on it */
1017                 kibnal_conn_addref(conn);       /* +1 ref from me... */
1018                 write_unlock_irqrestore (g_lock, flags);
1019                 
1020                 kibnal_queue_tx (tx, conn);
1021                 kibnal_conn_decref(conn);       /* ...until here */
1022                 return;
1023         }
1024
1025         if (peer->ibp_connecting == 0 &&
1026             peer->ibp_accepting == 0) {
1027                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1028                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1029                         write_unlock_irqrestore (g_lock, flags);
1030                         tx->tx_status = -EHOSTUNREACH;
1031                         kibnal_tx_done (tx);
1032                         return;
1033                 }
1034         
1035                 kibnal_schedule_active_connect_locked(peer);
1036         }
1037         
1038         /* A connection is being established; queue the message... */
1039         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1040
1041         write_unlock_irqrestore (g_lock, flags);
1042 }
1043
1044 void
1045 kibnal_txlist_done (struct list_head *txlist, int status)
1046 {
1047         kib_tx_t *tx;
1048
1049         while (!list_empty(txlist)) {
1050                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1051
1052                 list_del (&tx->tx_list);
1053                 /* complete now */
1054                 tx->tx_status = status;
1055                 kibnal_tx_done (tx);
1056         }
1057 }
1058
1059 int
1060 kibnal_start_passive_rdma (int type, lnet_msg_t *lntmsg,
1061                            int niov, struct iovec *iov, lnet_kiov_t *kiov,
1062                            int nob)
1063 {
1064         lnet_nid_t  nid = lntmsg->msg_target.nid;
1065         kib_tx_t   *tx;
1066         kib_msg_t  *ibmsg;
1067         int         rc;
1068         int         access;
1069         
1070         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1071                  type == IBNAL_MSG_GET_RDMA);
1072         LASSERT (nob > 0);
1073         LASSERT (!in_interrupt());              /* Mapping could block */
1074
1075         if (type == IBNAL_MSG_PUT_RDMA) {
1076                 access = IB_ACCESS_REMOTE_READ;
1077         } else {
1078                 access = IB_ACCESS_REMOTE_WRITE |
1079                          IB_ACCESS_LOCAL_WRITE;
1080         }
1081
1082         tx = kibnal_get_idle_tx ();
1083         if (tx == NULL) {
1084                 CERROR("Can't allocate %s txd for %s\n",
1085                        (type == IBNAL_MSG_PUT_RDMA) ? "PUT/REPLY" : "GET",
1086                        libcfs_nid2str(nid));
1087                 return -ENOMEM;
1088         }
1089
1090         
1091         if (iov != NULL) 
1092                 rc = kibnal_map_iov (tx, access, niov, iov, 0, nob);
1093         else
1094                 rc = kibnal_map_kiov (tx, access, niov, kiov, 0, nob);
1095
1096         if (rc != 0) {
1097                 CERROR ("Can't map RDMA for %s: %d\n", 
1098                         libcfs_nid2str(nid), rc);
1099                 goto failed;
1100         }
1101         
1102         if (type == IBNAL_MSG_GET_RDMA) {
1103                 /* reply gets finalized when tx completes */
1104                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni, 
1105                                                          lntmsg);
1106                 if (tx->tx_lntmsg[1] == NULL) {
1107                         CERROR ("Can't create reply for GET -> %s\n",
1108                                 libcfs_nid2str(nid));
1109                         rc = -ENOMEM;
1110                         goto failed;
1111                 }
1112         }
1113         
1114         tx->tx_passive_rdma = 1;
1115
1116         ibmsg = tx->tx_msg;
1117
1118         ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr;
1119         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1120         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1121         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1122         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1123
1124         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1125
1126         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1127                LPX64", nob %d\n",
1128                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1129                tx->tx_md.md_addr, nob);
1130         
1131         /* lntmsg gets finalized when tx completes. */
1132         tx->tx_lntmsg[0] = lntmsg;
1133
1134         kibnal_launch_tx(tx, nid);
1135         return (0);
1136
1137  failed:
1138         tx->tx_status = rc;
1139         kibnal_tx_done (tx);
1140         return (-EIO);
1141 }
1142
1143 void
1144 kibnal_start_active_rdma (int type, int status,
1145                           kib_rx_t *rx, lnet_msg_t *lntmsg, 
1146                           unsigned int niov,
1147                           struct iovec *iov, lnet_kiov_t *kiov,
1148                           int offset, int nob)
1149 {
1150         kib_msg_t    *rxmsg = rx->rx_msg;
1151         kib_msg_t    *txmsg;
1152         kib_tx_t     *tx;
1153         int           access;
1154         int           rdma_op;
1155         int           rc;
1156
1157         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1158                type, status, niov, offset, nob);
1159
1160         /* Called by scheduler */
1161         LASSERT (!in_interrupt ());
1162
1163         /* Either all pages or all vaddrs */
1164         LASSERT (!(kiov != NULL && iov != NULL));
1165
1166         /* No data if we're completing with failure */
1167         LASSERT (status == 0 || nob == 0);
1168
1169         LASSERT (type == IBNAL_MSG_GET_DONE ||
1170                  type == IBNAL_MSG_PUT_DONE);
1171
1172         if (type == IBNAL_MSG_GET_DONE) {
1173                 access   = 0;
1174                 rdma_op  = IB_OP_RDMA_WRITE;
1175                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1176         } else {
1177                 access   = IB_ACCESS_LOCAL_WRITE;
1178                 rdma_op  = IB_OP_RDMA_READ;
1179                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1180         }
1181
1182         tx = kibnal_get_idle_tx ();
1183         if (tx == NULL) {
1184                 CERROR ("tx descs exhausted on RDMA from %s"
1185                         " completing locally with failure\n",
1186                         libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid));
1187                 lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM);
1188                 return;
1189         }
1190         LASSERT (tx->tx_nsp == 0);
1191                         
1192         if (nob != 0) {
1193                 /* We actually need to transfer some data (the transfer
1194                  * size could get truncated to zero when the incoming
1195                  * message is matched) */
1196
1197                 if (kiov != NULL)
1198                         rc = kibnal_map_kiov (tx, access,
1199                                               niov, kiov, offset, nob);
1200                 else
1201                         rc = kibnal_map_iov (tx, access,
1202                                              niov, iov, offset, nob);
1203                 
1204                 if (rc != 0) {
1205                         CERROR ("Can't map RDMA -> %s: %d\n", 
1206                                 libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid), 
1207                                 rc);
1208                         /* We'll skip the RDMA and complete with failure. */
1209                         status = rc;
1210                         nob = 0;
1211                 } else {
1212                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1213                                 .address = tx->tx_md.md_addr,
1214                                 .length  = nob,
1215                                 .key     = tx->tx_md.md_lkey,
1216                         };
1217                 
1218                         tx->tx_sp[0] = (struct ib_send_param) {
1219                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1220                                 .op                   = rdma_op,
1221                                 .gather_list          = &tx->tx_gl[0],
1222                                 .num_gather_entries   = 1,
1223                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1224                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1225                                 .device_specific      = NULL,
1226                                 .solicited_event      = 0,
1227                                 .signaled             = 1,
1228                                 .immediate_data_valid = 0,
1229                                 .fence                = 0,
1230                                 .inline_data          = 0,
1231                         };
1232
1233                         tx->tx_nsp = 1;
1234                 }
1235         }
1236
1237         txmsg = tx->tx_msg;
1238
1239         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1240         txmsg->ibm_u.completion.ibcm_status = status;
1241         
1242         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1243
1244         if (status == 0 && nob != 0) {
1245                 LASSERT (tx->tx_nsp > 1);
1246                 /* RDMA: lntmsg gets finalized when the tx completes.  This
1247                  * is after the completion message has been sent, which in
1248                  * turn is after the RDMA has finished. */
1249                 tx->tx_lntmsg[0] = lntmsg;
1250         } else {
1251                 LASSERT (tx->tx_nsp == 1);
1252                 /* No RDMA: local completion happens now! */
1253                 CDEBUG(D_NET, "No data: immediate completion\n");
1254                 lnet_finalize (kibnal_data.kib_ni, lntmsg,
1255                               status == 0 ? 0 : -EIO);
1256         }
1257
1258         kibnal_queue_tx(tx, rx->rx_conn);
1259 }
1260
1261 int
1262 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1263 {
1264         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1265         int               type = lntmsg->msg_type; 
1266         lnet_process_id_t target = lntmsg->msg_target;
1267         int               target_is_router = lntmsg->msg_target_is_router;
1268         int               routing = lntmsg->msg_routing;
1269         unsigned int      payload_niov = lntmsg->msg_niov; 
1270         struct iovec     *payload_iov = lntmsg->msg_iov; 
1271         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1272         unsigned int      payload_offset = lntmsg->msg_offset;
1273         unsigned int      payload_nob = lntmsg->msg_len;
1274         kib_msg_t        *ibmsg;
1275         kib_tx_t         *tx;
1276         int               nob;
1277
1278         /* NB 'private' is different depending on what we're sending.... */
1279
1280         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1281                payload_nob, payload_niov, libcfs_id2str(target));
1282
1283         LASSERT (payload_nob == 0 || payload_niov > 0);
1284         LASSERT (payload_niov <= LNET_MAX_IOV);
1285
1286         /* Thread context if we're sending payload */
1287         LASSERT (!in_interrupt() || payload_niov == 0);
1288         /* payload is either all vaddrs or all pages */
1289         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1290
1291         switch (type) {
1292         default:
1293                 LBUG();
1294                 return (-EIO);
1295                 
1296         case LNET_MSG_ACK:
1297                 LASSERT (payload_nob == 0);
1298                 break;
1299
1300         case LNET_MSG_GET:
1301                 if (routing || target_is_router)
1302                         break;                  /* send IMMEDIATE */
1303
1304                 /* is the REPLY message too small for RDMA? */
1305                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1306                 if (nob <= IBNAL_MSG_SIZE)
1307                         break;                  /* send IMMEDIATE */
1308
1309                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1310                         return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1311                                                          lntmsg->msg_md->md_niov, 
1312                                                          lntmsg->msg_md->md_iov.iov, NULL,
1313                                                          lntmsg->msg_md->md_length);
1314
1315                 return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1316                                                  lntmsg->msg_md->md_niov, 
1317                                                  NULL, lntmsg->msg_md->md_iov.kiov,
1318                                                  lntmsg->msg_md->md_length);
1319
1320         case LNET_MSG_REPLY:
1321         case LNET_MSG_PUT:
1322                 /* Is the payload small enough not to need RDMA? */
1323                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1324                 if (nob <= IBNAL_MSG_SIZE)
1325                         break;                  /* send IMMEDIATE */
1326                 
1327                 return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg,
1328                                                  payload_niov,
1329                                                  payload_iov, payload_kiov,
1330                                                  payload_nob);
1331         }
1332
1333         /* Send IMMEDIATE */
1334
1335         tx = kibnal_get_idle_tx();
1336         if (tx == NULL) {
1337                 CERROR ("Can't send %d to %s: tx descs exhausted%s\n", 
1338                         type, libcfs_nid2str(target.nid), 
1339                         in_interrupt() ? " (intr)" : "");
1340                 return (-ENOMEM);
1341         }
1342
1343         ibmsg = tx->tx_msg;
1344         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1345
1346         if (payload_kiov != NULL)
1347                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1348                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1349                                     payload_niov, payload_kiov, 
1350                                     payload_offset, payload_nob);
1351         else
1352                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1353                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1354                                    payload_niov, payload_iov, 
1355                                    payload_offset, payload_nob);
1356
1357         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1358                             offsetof(kib_immediate_msg_t, 
1359                                      ibim_payload[payload_nob]));
1360
1361         /* lntmsg gets finalized when tx completes */
1362         tx->tx_lntmsg[0] = lntmsg;
1363
1364         kibnal_launch_tx(tx, target.nid);
1365         return (0);
1366 }
1367
1368 int
1369 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1370                    void **new_private)
1371 {
1372         kib_rx_t    *rx = private;
1373         kib_conn_t  *conn = rx->rx_conn;
1374
1375         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1376                 /* Can't block if RDMA completions need normal credits */
1377                 LCONSOLE_ERROR_MSG(0x12a, 
1378                                "Dropping message from %s: no buffers free. "
1379                                "%s is running an old version of LNET that may "
1380                                "deadlock if messages wait for buffers)\n",
1381                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1382                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1383                 return -EDEADLK;
1384         }
1385         
1386         *new_private = private;
1387         return 0;
1388 }
1389
1390 int
1391 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1392              int delayed, unsigned int niov,
1393              struct iovec *iov, lnet_kiov_t *kiov,
1394              unsigned int offset, unsigned int mlen, unsigned int rlen)
1395 {
1396         kib_rx_t    *rx = private;
1397         kib_msg_t   *rxmsg = rx->rx_msg;
1398         int          msg_nob;
1399         int          rc = 0;
1400         
1401         LASSERT (mlen <= rlen);
1402         LASSERT (!in_interrupt ());
1403         /* Either all pages or all vaddrs */
1404         LASSERT (!(kiov != NULL && iov != NULL));
1405
1406         switch (rxmsg->ibm_type) {
1407         default:
1408                 LBUG();
1409
1410         case IBNAL_MSG_IMMEDIATE:
1411                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1412                 if (msg_nob > rx->rx_nob) {
1413                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1414                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1415                                 msg_nob, rx->rx_nob);
1416                         rc = -EPROTO;
1417                         break;
1418                 }
1419
1420                 if (kiov != NULL)
1421                         lnet_copy_flat2kiov(
1422                                 niov, kiov, offset, 
1423                                 IBNAL_MSG_SIZE, rxmsg,
1424                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1425                                 mlen);
1426                 else
1427                         lnet_copy_flat2iov(
1428                                 niov, iov, offset,
1429                                 IBNAL_MSG_SIZE, rxmsg,
1430                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1431                                 mlen);
1432
1433                 lnet_finalize (ni, lntmsg, 0);
1434                 break;
1435
1436         case IBNAL_MSG_GET_RDMA:
1437                 if (lntmsg != NULL) {
1438                         /* GET matched: RDMA lntmsg's payload */
1439                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1440                                                  rx, lntmsg, 
1441                                                  lntmsg->msg_niov, 
1442                                                  lntmsg->msg_iov, 
1443                                                  lntmsg->msg_kiov,
1444                                                  lntmsg->msg_offset, 
1445                                                  lntmsg->msg_len);
1446                 } else {
1447                         /* GET didn't match anything */
1448                         kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
1449                                                   rx, NULL, 0, NULL, NULL, 0, 0);
1450                 }
1451                 break;
1452
1453         case IBNAL_MSG_PUT_RDMA:
1454                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg,
1455                                           niov, iov, kiov, offset, mlen);
1456                 break;
1457         }
1458
1459         kibnal_post_rx(rx, 1, 0);
1460         return rc;
1461 }
1462
1463 int
1464 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1465 {
1466         long    pid = kernel_thread (fn, arg, 0);
1467
1468         if (pid < 0)
1469                 return ((int)pid);
1470
1471         atomic_inc (&kibnal_data.kib_nthreads);
1472         return (0);
1473 }
1474
1475 void
1476 kibnal_thread_fini (void)
1477 {
1478         atomic_dec (&kibnal_data.kib_nthreads);
1479 }
1480
1481 void
1482 kibnal_peer_alive (kib_peer_t *peer)
1483 {
1484         /* This is racy, but everyone's only writing cfs_time_current() */
1485         peer->ibp_last_alive = cfs_time_current();
1486         mb();
1487 }
1488
1489 void
1490 kibnal_peer_notify (kib_peer_t *peer)
1491 {
1492         time_t        last_alive = 0;
1493         int           error = 0;
1494         unsigned long flags;
1495         
1496         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1497
1498         if (list_empty(&peer->ibp_conns) &&
1499             peer->ibp_accepting == 0 &&
1500             peer->ibp_connecting == 0 &&
1501             peer->ibp_error != 0) {
1502                 error = peer->ibp_error;
1503                 peer->ibp_error = 0;
1504                 last_alive = cfs_time_current_sec() -
1505                              cfs_duration_sec(cfs_time_current() -
1506                                               peer->ibp_last_alive);
1507         }
1508         
1509         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1510         
1511         if (error != 0)
1512                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1513 }
1514
1515 void
1516 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1517 {
1518         /* This just does the immmediate housekeeping, and schedules the
1519          * connection for the reaper to finish off.
1520          * Caller holds kib_global_lock exclusively in irq context */
1521         kib_peer_t   *peer = conn->ibc_peer;
1522
1523         CDEBUG (error == 0 ? D_NET : D_NETERROR,
1524                 "closing conn to %s: error %d\n", 
1525                 libcfs_nid2str(peer->ibp_nid), error);
1526         
1527         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1528                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1529
1530         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1531                 /* kib_reaper_conns takes ibc_list's ref */
1532                 list_del (&conn->ibc_list);
1533         } else {
1534                 /* new ref for kib_reaper_conns */
1535                 kibnal_conn_addref(conn);
1536         }
1537         
1538         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1539                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1540                     kibnal_peer_active(peer))     /* still in peer table */
1541                         kibnal_unlink_peer_locked (peer);
1542
1543                 peer->ibp_error = error; /* set/clear error on last conn */
1544         }
1545
1546         conn->ibc_state = IBNAL_CONN_DEATHROW;
1547
1548         /* Schedule conn for closing/destruction */
1549         spin_lock (&kibnal_data.kib_reaper_lock);
1550
1551         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1552         wake_up (&kibnal_data.kib_reaper_waitq);
1553                 
1554         spin_unlock (&kibnal_data.kib_reaper_lock);
1555 }
1556
1557 int
1558 kibnal_close_conn (kib_conn_t *conn, int why)
1559 {
1560         unsigned long     flags;
1561         int               count = 0;
1562
1563         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1564
1565         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1566         
1567         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1568                 count = 1;
1569                 kibnal_close_conn_locked (conn, why);
1570         }
1571         
1572         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1573         return (count);
1574 }
1575
1576 void
1577 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
1578 {
1579         LIST_HEAD        (zombies);
1580         unsigned long     flags;
1581
1582         LASSERT(error != 0);
1583
1584         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1585
1586         if (active) {
1587                 LASSERT (peer->ibp_connecting != 0);
1588                 peer->ibp_connecting--;
1589         } else {
1590                 LASSERT (peer->ibp_accepting != 0);
1591                 peer->ibp_accepting--;
1592         }
1593
1594         if (peer->ibp_connecting != 0 ||
1595             peer->ibp_accepting != 0) {
1596                 /* another connection attempt under way... */
1597                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1598                 return;
1599         }
1600
1601         if (list_empty(&peer->ibp_conns)) {
1602                 /* Say when active connection can be re-attempted */
1603                 peer->ibp_reconnect_interval *= 2;
1604                 peer->ibp_reconnect_interval =
1605                         MAX(peer->ibp_reconnect_interval,
1606                             *kibnal_tunables.kib_min_reconnect_interval);
1607                 peer->ibp_reconnect_interval =
1608                         MIN(peer->ibp_reconnect_interval,
1609                             *kibnal_tunables.kib_max_reconnect_interval);
1610                 
1611                 peer->ibp_reconnect_time = jiffies + 
1612                                            peer->ibp_reconnect_interval * HZ;
1613         
1614                 /* Take peer's blocked transmits; I'll complete
1615                  * them with error */
1616                 list_add(&zombies, &peer->ibp_tx_queue);
1617                 list_del_init(&peer->ibp_tx_queue);
1618                 
1619                 if (kibnal_peer_active(peer) &&
1620                     (peer->ibp_persistence == 0)) {
1621                         /* failed connection attempt on non-persistent peer */
1622                         kibnal_unlink_peer_locked (peer);
1623                 }
1624
1625                 peer->ibp_error = error;
1626         } else {
1627                 /* Can't have blocked transmits if there are connections */
1628                 LASSERT (list_empty(&peer->ibp_tx_queue));
1629         }
1630         
1631         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1632
1633         kibnal_peer_notify(peer);
1634         
1635         if (!list_empty (&zombies))
1636                 CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
1637                         libcfs_nid2str(peer->ibp_nid));
1638
1639         kibnal_txlist_done(&zombies, -EHOSTUNREACH);
1640 }
1641
1642 void
1643 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1644 {
1645         int               state = conn->ibc_state;
1646         kib_peer_t       *peer = conn->ibc_peer;
1647         kib_tx_t         *tx;
1648         unsigned long     flags;
1649         int               rc;
1650         int               i;
1651
1652         if (conn->ibc_connreq != NULL) {
1653                 LIBCFS_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1654                 conn->ibc_connreq = NULL;
1655         }
1656
1657         switch (state) {
1658         case IBNAL_CONN_CONNECTING:
1659                 /* conn has a CM comm_id */
1660                 if (status == 0) {
1661                         /* Install common (active/passive) callback for
1662                          * disconnect/idle notification */
1663                         rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1664                                                   kibnal_conn_callback,
1665                                                   conn);
1666                         LASSERT (rc == 0);
1667                 } else {
1668                         /* LASSERT (no more CM callbacks) */
1669                         rc = tsIbCmCallbackModify(conn->ibc_comm_id,
1670                                                   kibnal_bad_conn_callback,
1671                                                   conn);
1672                         LASSERT (rc == 0);
1673                 }
1674                 break;
1675                 
1676         case IBNAL_CONN_INIT_QP:
1677                 LASSERT (status != 0);
1678                 break;
1679                 
1680         default:
1681                 LBUG();
1682         }
1683         
1684         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1685
1686         if (active)
1687                 LASSERT (peer->ibp_connecting != 0);
1688         else
1689                 LASSERT (peer->ibp_accepting != 0);
1690         
1691         if (status == 0 &&                      /* connection established */
1692             kibnal_peer_active(peer)) {         /* peer not deleted */
1693
1694                 if (active)
1695                         peer->ibp_connecting--;
1696                 else
1697                         peer->ibp_accepting--;
1698
1699                 conn->ibc_last_send = jiffies;
1700                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1701                 kibnal_peer_alive(peer);
1702
1703                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1704                  * the IB_CM_IDLE callback */
1705                 kibnal_conn_addref(conn);
1706                 list_add (&conn->ibc_list, &peer->ibp_conns);
1707
1708                 peer->ibp_reconnect_interval = 0; /* OK to reconnect at any time */
1709
1710                 /* post blocked sends to the new connection */
1711                 spin_lock (&conn->ibc_lock);
1712                 
1713                 while (!list_empty (&peer->ibp_tx_queue)) {
1714                         tx = list_entry (peer->ibp_tx_queue.next, 
1715                                          kib_tx_t, tx_list);
1716                         
1717                         list_del (&tx->tx_list);
1718
1719                         kibnal_queue_tx_locked (tx, conn);
1720                 }
1721                 
1722                 spin_unlock (&conn->ibc_lock);
1723
1724                 /* Nuke any dangling conns from a different peer instance... */
1725                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1726                                                  conn->ibc_incarnation);
1727
1728                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1729
1730                 /* queue up all the receives */
1731                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1732                         /* +1 ref for rx desc */
1733                         kibnal_conn_addref(conn);
1734
1735                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1736                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1737                                conn->ibc_rxs[i].rx_vaddr);
1738
1739                         kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
1740                 }
1741
1742                 kibnal_check_sends (conn);
1743                 return;
1744         }
1745
1746         if (status == 0) {
1747                 /* connection established, but peer was deleted.  Schedule for
1748                  * reaper to cm_disconnect... */
1749                 status = -ECONNABORTED;
1750                 kibnal_close_conn_locked (conn, status);
1751         } else {
1752                 /* just waiting for refs to drain */
1753                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1754         } 
1755
1756         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1757
1758         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1759 }
1760
1761 int
1762 kibnal_accept_connreq (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1763                        kib_msg_t *msg, int nob)
1764 {
1765         kib_conn_t    *conn;
1766         kib_peer_t    *peer;
1767         kib_peer_t    *peer2;
1768         unsigned long  flags;
1769         int            rc;
1770
1771         rc = kibnal_unpack_msg(msg, 0, nob);
1772         if (rc != 0) {
1773                 CERROR("Can't unpack connreq msg: %d\n", rc);
1774                 return -EPROTO;
1775         }
1776
1777         CDEBUG(D_NET, "connreq from %s\n", libcfs_nid2str(msg->ibm_srcnid));
1778
1779         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1780                 CERROR("Unexpected connreq msg type: %x from %s\n",
1781                        msg->ibm_type, libcfs_nid2str(msg->ibm_srcnid));
1782                 return -EPROTO;
1783         }
1784                 
1785         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1786                 CERROR("Can't accept %s: bad queue depth %d (%d expected)\n",
1787                        libcfs_nid2str(msg->ibm_srcnid), 
1788                        msg->ibm_u.connparams.ibcp_queue_depth, 
1789                        IBNAL_MSG_QUEUE_SIZE);
1790                 return (-EPROTO);
1791         }
1792         
1793         conn = kibnal_create_conn();
1794         if (conn == NULL)
1795                 return (-ENOMEM);
1796
1797         /* assume 'nid' is a new peer */
1798         rc = kibnal_create_peer(&peer, msg->ibm_srcnid);
1799         if (rc != 0) {
1800                 kibnal_conn_decref(conn);
1801                 return (-ENOMEM);
1802         }
1803         
1804         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1805
1806         if (kibnal_data.kib_nonewpeers) {
1807                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1808                 
1809                 CERROR ("Shutdown has started, drop connreq from %s\n",
1810                         libcfs_nid2str(msg->ibm_srcnid));
1811                 kibnal_conn_decref(conn);
1812                 kibnal_peer_decref(peer);
1813                 return -ESHUTDOWN;
1814         }
1815
1816         /* Check I'm the same instance that gave the connection parameters.  
1817          * NB If my incarnation changes after this, the peer will get nuked and
1818          * we'll spot that when the connection is finally added into the peer's
1819          * connlist */
1820         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
1821                                      msg->ibm_dstnid) ||
1822             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1823                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1824                 
1825                 CERROR("Stale connection params from %s\n",
1826                        libcfs_nid2str(msg->ibm_srcnid));
1827                 kibnal_conn_decref(conn);
1828                 kibnal_peer_decref(peer);
1829                 return -ESTALE;
1830         }
1831
1832         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1833         if (peer2 == NULL) {
1834                 /* Brand new peer */
1835                 LASSERT (peer->ibp_accepting == 0);
1836
1837                 /* peer table takes my ref on peer */
1838                 list_add_tail (&peer->ibp_list,
1839                                kibnal_nid2peerlist(msg->ibm_srcnid));
1840         } else {
1841                 /* tie-break connection race in favour of the higher NID */                
1842                 if (peer2->ibp_connecting != 0 &&
1843                     msg->ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
1844                         write_unlock_irqrestore(&kibnal_data.kib_global_lock,
1845                                                 flags);
1846                         CWARN("Conn race %s\n",
1847                               libcfs_nid2str(peer2->ibp_nid));
1848
1849                         kibnal_conn_decref(conn);
1850                         kibnal_peer_decref(peer);
1851                         return -EALREADY;
1852                 }
1853
1854                 kibnal_peer_decref(peer);
1855                 peer = peer2;
1856         }
1857
1858         /* +1 ref for conn */
1859         kibnal_peer_addref(peer);
1860         peer->ibp_accepting++;
1861
1862         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1863
1864         conn->ibc_peer = peer;
1865         conn->ibc_state = IBNAL_CONN_CONNECTING;
1866         conn->ibc_comm_id = cid;
1867         conn->ibc_incarnation = msg->ibm_srcstamp;
1868         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1869         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
1870         conn->ibc_version = msg->ibm_version;
1871
1872         *connp = conn;
1873         return (0);
1874 }
1875
1876 tTS_IB_CM_CALLBACK_RETURN
1877 kibnal_bad_conn_callback (tTS_IB_CM_EVENT event,
1878                           tTS_IB_CM_COMM_ID cid,
1879                           void *param,
1880                           void *arg)
1881 {
1882         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1883         LBUG ();
1884         return TS_IB_CM_CALLBACK_PROCEED;
1885 }
1886
1887 void
1888 kibnal_abort_txs (kib_conn_t *conn, struct list_head *txs)
1889 {
1890         LIST_HEAD        (zombies); 
1891         struct list_head *tmp;
1892         struct list_head *nxt;
1893         kib_tx_t         *tx;
1894         unsigned long     flags;
1895
1896         spin_lock_irqsave (&conn->ibc_lock, flags);
1897
1898         list_for_each_safe (tmp, nxt, txs) {
1899                 tx = list_entry (tmp, kib_tx_t, tx_list);
1900
1901                 if (txs == &conn->ibc_active_txs) {
1902                         LASSERT (tx->tx_passive_rdma ||
1903                                  !tx->tx_passive_rdma_wait);
1904
1905                         LASSERT (tx->tx_passive_rdma_wait ||
1906                                  tx->tx_sending != 0);
1907                 } else {
1908                         LASSERT (!tx->tx_passive_rdma_wait);
1909                         LASSERT (tx->tx_sending == 0);
1910                 }
1911
1912                 tx->tx_status = -ECONNABORTED;
1913                 tx->tx_passive_rdma_wait = 0;
1914
1915                 if (tx->tx_sending == 0) {
1916                         list_del (&tx->tx_list);
1917                         list_add (&tx->tx_list, &zombies);
1918                 }
1919         }
1920         
1921         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1922
1923         kibnal_txlist_done (&zombies, -ECONNABORTED);
1924 }
1925
1926 tTS_IB_CM_CALLBACK_RETURN
1927 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1928                       tTS_IB_CM_COMM_ID cid,
1929                       void *param,
1930                       void *arg)
1931 {
1932         kib_conn_t       *conn = arg;
1933         int               rc;
1934
1935         /* Established Connection Notifier */
1936
1937         switch (event) {
1938         default:
1939                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
1940                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1941                 kibnal_close_conn (conn, -ECONNABORTED);
1942                 break;
1943                 
1944         case TS_IB_CM_DISCONNECTED:
1945                 CDEBUG(D_NETERROR, "Connection %p -> %s DISCONNECTED.\n",
1946                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1947                 kibnal_close_conn (conn, 0);
1948                 break;
1949
1950         case TS_IB_CM_IDLE:
1951                 CDEBUG(D_NET, "Connection %p -> %s IDLE.\n",
1952                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1953
1954                 /* LASSERT (no further callbacks) */
1955                 rc = tsIbCmCallbackModify(cid, kibnal_bad_conn_callback, conn);
1956                 LASSERT (rc == 0);
1957
1958                 /* NB we wait until the connection has closed before
1959                  * completing outstanding passive RDMAs so we can be sure
1960                  * the network can't touch the mapped memory any more. */
1961
1962                 kibnal_abort_txs(conn, &conn->ibc_tx_queue);
1963                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
1964                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
1965                 kibnal_abort_txs(conn, &conn->ibc_active_txs);
1966                 
1967                 kibnal_conn_decref(conn);        /* Lose CM's ref */
1968                 break;
1969         }
1970
1971         return TS_IB_CM_CALLBACK_PROCEED;
1972 }
1973
1974 tTS_IB_CM_CALLBACK_RETURN
1975 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1976                               tTS_IB_CM_COMM_ID cid,
1977                               void *param,
1978                               void *arg)
1979 {
1980         kib_conn_t  *conn = arg;
1981         int          rc;
1982         
1983         switch (event) {
1984         default:
1985                 if (conn == NULL) {
1986                         /* no connection yet */
1987                         CERROR ("Unexpected event: %d\n", event);
1988                         return TS_IB_CM_CALLBACK_ABORT;
1989                 }
1990                 
1991                 CERROR ("%s event %p -> %s: %d\n",
1992                         (event == TS_IB_CM_IDLE) ? "IDLE" : "Unexpected",
1993                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1994                 kibnal_connreq_done(conn, 0, -ECONNABORTED);
1995                 kibnal_conn_decref(conn); /* drop CM's ref */
1996                 return TS_IB_CM_CALLBACK_ABORT;
1997                 
1998         case TS_IB_CM_REQ_RECEIVED: {
1999                 struct ib_cm_req_received_param *req = param;
2000                 kib_msg_t                       *msg = req->remote_private_data;
2001
2002                 LASSERT (conn == NULL);
2003
2004                 /* Don't really know srcnid until successful unpack */
2005                 CDEBUG(D_NET, "REQ from ?%s?\n", libcfs_nid2str(msg->ibm_srcnid));
2006
2007                 rc = kibnal_accept_connreq(&conn, cid, msg, 
2008                                            req->remote_private_data_len);
2009                 if (rc != 0) {
2010                         CERROR ("Can't accept ?%s?: %d\n",
2011                                 libcfs_nid2str(msg->ibm_srcnid), rc);
2012                         return TS_IB_CM_CALLBACK_ABORT;
2013                 }
2014
2015                 /* update 'arg' for next callback */
2016                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
2017                 LASSERT (rc == 0);
2018
2019                 msg = req->accept_param.reply_private_data;
2020                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
2021                                 sizeof(msg->ibm_u.connparams));
2022
2023                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2024
2025                 kibnal_pack_msg(msg, conn->ibc_version, 0, 
2026                                 conn->ibc_peer->ibp_nid, 
2027                                 conn->ibc_incarnation);
2028
2029                 req->accept_param.qp                     = conn->ibc_qp;
2030                 req->accept_param.reply_private_data_len = msg->ibm_nob;
2031                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
2032                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
2033                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
2034                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
2035
2036                 CDEBUG(D_NET, "Proceeding\n");
2037                 return TS_IB_CM_CALLBACK_PROCEED; /* CM takes my ref on conn */
2038         }
2039
2040         case TS_IB_CM_ESTABLISHED:
2041                 LASSERT (conn != NULL);
2042                 CWARN("Connection %p -> %s ESTABLISHED.\n",
2043                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2044
2045                 kibnal_connreq_done(conn, 0, 0);
2046                 return TS_IB_CM_CALLBACK_PROCEED;
2047         }
2048 }
2049
2050 tTS_IB_CM_CALLBACK_RETURN
2051 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
2052                              tTS_IB_CM_COMM_ID cid,
2053                              void *param,
2054                              void *arg)
2055 {
2056         kib_conn_t    *conn = arg;
2057         unsigned long  flags;
2058
2059         switch (event) {
2060         case TS_IB_CM_REP_RECEIVED: {
2061                 struct ib_cm_rep_received_param *rep = param;
2062                 kib_msg_t                       *msg = rep->remote_private_data;
2063                 int                              nob = rep->remote_private_data_len;
2064                 int                              rc;
2065
2066                 rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
2067                 if (rc != 0) {
2068                         CERROR ("Error %d unpacking conn ack from %s\n",
2069                                 rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2070                         kibnal_connreq_done(conn, 1, rc);
2071                         kibnal_conn_decref(conn); /* drop CM's ref */
2072                         return TS_IB_CM_CALLBACK_ABORT;
2073                 }
2074
2075                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2076                         CERROR ("Unexpected conn ack type %d from %s\n",
2077                                 msg->ibm_type, 
2078                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2079                         kibnal_connreq_done(conn, 1, -EPROTO);
2080                         kibnal_conn_decref(conn); /* drop CM's ref */
2081                         return TS_IB_CM_CALLBACK_ABORT;
2082                 }
2083
2084                 if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
2085                                              msg->ibm_srcnid) ||
2086                     !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
2087                                              msg->ibm_dstnid) ||
2088                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2089                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2090                         CERROR("Stale conn ack from %s\n",
2091                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2092                         kibnal_connreq_done(conn, 1, -ESTALE);
2093                         kibnal_conn_decref(conn); /* drop CM's ref */
2094                         return TS_IB_CM_CALLBACK_ABORT;
2095                 }
2096
2097                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2098                         CERROR ("Bad queue depth %d from %s\n",
2099                                 msg->ibm_u.connparams.ibcp_queue_depth,
2100                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2101                         kibnal_connreq_done(conn, 1, -EPROTO);
2102                         kibnal_conn_decref(conn); /* drop CM's ref */
2103                         return TS_IB_CM_CALLBACK_ABORT;
2104                 }
2105                                 
2106                 CDEBUG(D_NET, "Connection %p -> %s REP_RECEIVED.\n",
2107                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2108
2109                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2110                 conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2111                 return TS_IB_CM_CALLBACK_PROCEED;
2112         }
2113
2114         case TS_IB_CM_ESTABLISHED:
2115                 CWARN("Connection %p -> %s ESTABLISHED\n",
2116                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2117
2118                 kibnal_connreq_done(conn, 1, 0);
2119                 return TS_IB_CM_CALLBACK_PROCEED;
2120
2121         case TS_IB_CM_IDLE:
2122                 CDEBUG(D_NETERROR, "Connection %p -> %s IDLE\n",
2123                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2124                 /* I assume this connection attempt was rejected because the
2125                  * peer found a stale QP; I'll just try again */
2126                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2127                 kibnal_schedule_active_connect_locked(conn->ibc_peer);
2128                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2129
2130                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2131                 kibnal_conn_decref(conn); /* drop CM's ref */
2132                 return TS_IB_CM_CALLBACK_ABORT;
2133
2134         default:
2135                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
2136                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
2137                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2138                 kibnal_conn_decref(conn); /* drop CM's ref */
2139                 return TS_IB_CM_CALLBACK_ABORT;
2140         }
2141 }
2142
2143 int
2144 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2145                           struct ib_path_record *resp, int remaining,
2146                           void *arg)
2147 {
2148         kib_conn_t *conn = arg;
2149         kib_peer_t *peer = conn->ibc_peer;
2150         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2151
2152         if (status != 0) {
2153                 CDEBUG (D_NETERROR, "Pathreq %p -> %s failed: %d\n",
2154                         conn, libcfs_nid2str(peer->ibp_nid), status);
2155                 kibnal_connreq_done(conn, 1, status);
2156                 kibnal_conn_decref(conn); /* drop callback's ref */
2157                 return 1;    /* non-zero prevents further callbacks */
2158         }
2159
2160         conn->ibc_connreq->cr_path = *resp;
2161
2162         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2163         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2164         kibnal_pack_msg(msg, conn->ibc_version, 0, 
2165                         peer->ibp_nid, conn->ibc_incarnation);
2166
2167         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2168                 .qp                   = conn->ibc_qp,
2169                 .req_private_data     = msg,
2170                 .req_private_data_len = msg->ibm_nob,
2171                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2172                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2173                 .retry_count          = IBNAL_RETRY,
2174                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2175                 .cm_response_timeout  = *kibnal_tunables.kib_timeout,
2176                 .max_cm_retries       = IBNAL_CM_RETRY,
2177                 .flow_control         = IBNAL_FLOW_CONTROL,
2178         };
2179
2180         /* XXX set timeout just like SDP!!!*/
2181         conn->ibc_connreq->cr_path.packet_life = 13;
2182         
2183         /* Flag I'm getting involved with the CM... */
2184         conn->ibc_state = IBNAL_CONN_CONNECTING;
2185
2186         CDEBUG(D_NET, "Connecting to, service id "LPX64", on %s\n",
2187                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 
2188                libcfs_nid2str(peer->ibp_nid));
2189
2190         /* kibnal_connect_callback gets my conn ref */
2191         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2192                                 &conn->ibc_connreq->cr_path, NULL,
2193                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2194                                 kibnal_active_conn_callback, conn,
2195                                 &conn->ibc_comm_id);
2196         if (status != 0) {
2197                 CERROR ("Connect %p -> %s failed: %d\n",
2198                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
2199                 /* Back out state change: I've not got a CM comm_id yet... */
2200                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2201                 kibnal_connreq_done(conn, 1, status);
2202                 kibnal_conn_decref(conn); /* Drop callback's ref */
2203         }
2204         
2205         return 1;    /* non-zero to prevent further callbacks */
2206 }
2207
2208 void
2209 kibnal_connect_peer (kib_peer_t *peer)
2210 {
2211         kib_conn_t  *conn;
2212         int          rc;
2213
2214         conn = kibnal_create_conn();
2215         if (conn == NULL) {
2216                 CERROR ("Can't allocate conn\n");
2217                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2218                 return;
2219         }
2220
2221         conn->ibc_peer = peer;
2222         kibnal_peer_addref(peer);
2223
2224         LIBCFS_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2225         if (conn->ibc_connreq == NULL) {
2226                 CERROR ("Can't allocate connreq\n");
2227                 kibnal_connreq_done(conn, 1, -ENOMEM);
2228                 kibnal_conn_decref(conn); /* drop my ref */
2229                 return;
2230         }
2231
2232         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2233
2234         rc = kibnal_make_svcqry(conn);
2235         if (rc != 0) {
2236                 kibnal_connreq_done (conn, 1, rc);
2237                 kibnal_conn_decref(conn); /* drop my ref */
2238                 return;
2239         }
2240
2241         rc = ib_cached_gid_get(kibnal_data.kib_device,
2242                                kibnal_data.kib_port, 0,
2243                                conn->ibc_connreq->cr_gid);
2244         LASSERT (rc == 0);
2245
2246         /* kibnal_pathreq_callback gets my conn ref */
2247         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2248                                     kibnal_data.kib_port,
2249                                     conn->ibc_connreq->cr_gid,
2250                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2251                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2252                                     0,
2253                                     *kibnal_tunables.kib_timeout * HZ,
2254                                     0,
2255                                     kibnal_pathreq_callback, conn, 
2256                                     &conn->ibc_connreq->cr_tid);
2257         if (rc == 0)
2258                 return; /* callback now has my ref on conn */
2259
2260         CERROR ("Path record request %p -> %s failed: %d\n",
2261                 conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
2262         kibnal_connreq_done(conn, 1, rc);
2263         kibnal_conn_decref(conn); /* drop my ref */
2264 }
2265
2266 int
2267 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
2268 {
2269         kib_tx_t          *tx;
2270         struct list_head  *ttmp;
2271         unsigned long      flags;
2272         int                timed_out = 0;
2273
2274         spin_lock_irqsave (&conn->ibc_lock, flags);
2275
2276         list_for_each (ttmp, txs) {
2277                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2278
2279                 if (txs == &conn->ibc_active_txs) {
2280                         LASSERT (tx->tx_passive_rdma ||
2281                                  !tx->tx_passive_rdma_wait);
2282
2283                         LASSERT (tx->tx_passive_rdma_wait ||
2284                                  tx->tx_sending != 0);
2285                 } else {
2286                         LASSERT (!tx->tx_passive_rdma_wait);
2287                         LASSERT (tx->tx_sending == 0);
2288                 }
2289                 
2290                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2291                         timed_out = 1;
2292                         break;
2293                 }
2294         }
2295
2296         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2297         return timed_out;
2298 }
2299
2300 int
2301 kibnal_conn_timed_out (kib_conn_t *conn)
2302 {
2303         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
2304                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
2305                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
2306                 kibnal_check_txs(conn, &conn->ibc_active_txs);
2307 }
2308
2309 void
2310 kibnal_check_conns (int idx)
2311 {
2312         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2313         struct list_head  *ptmp;
2314         kib_peer_t        *peer;
2315         kib_conn_t        *conn;
2316         struct list_head  *ctmp;
2317         unsigned long      flags;
2318
2319  again:
2320         /* NB. We expect to have a look at all the peers and not find any
2321          * rdmas to time out, so we just use a shared lock while we
2322          * take a look... */
2323         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2324
2325         list_for_each (ptmp, peers) {
2326                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2327
2328                 list_for_each (ctmp, &peer->ibp_conns) {
2329                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2330
2331                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2332
2333
2334                         /* In case we have enough credits to return via a
2335                          * NOOP, but there were no non-blocking tx descs
2336                          * free to do it last time... */
2337                         kibnal_check_sends(conn);
2338
2339                         if (!kibnal_conn_timed_out(conn))
2340                                 continue;
2341                         
2342                         kibnal_conn_addref(conn);
2343
2344                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2345                                                flags);
2346
2347                         CERROR("Timed out RDMA with %s\n",
2348                                libcfs_nid2str(peer->ibp_nid));
2349
2350                         kibnal_close_conn (conn, -ETIMEDOUT);
2351                         kibnal_conn_decref(conn);
2352
2353                         /* start again now I've dropped the lock */
2354                         goto again;
2355                 }
2356         }
2357
2358         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2359 }
2360
2361 void
2362 kibnal_terminate_conn (kib_conn_t *conn)
2363 {
2364         int           rc;
2365
2366         CDEBUG(D_NET, "conn %p\n", conn);
2367         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2368         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2369
2370         rc = ib_cm_disconnect (conn->ibc_comm_id);
2371         if (rc != 0)
2372                 CERROR ("Error %d disconnecting conn %p -> %s\n",
2373                         rc, conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2374
2375         kibnal_peer_notify(conn->ibc_peer);
2376 }
2377
2378 int
2379 kibnal_reaper (void *arg)
2380 {
2381         wait_queue_t       wait;
2382         unsigned long      flags;
2383         kib_conn_t        *conn;
2384         int                timeout;
2385         int                i;
2386         int                peer_index = 0;
2387         unsigned long      deadline = jiffies;
2388         
2389         cfs_daemonize ("kibnal_reaper");
2390         cfs_block_allsigs ();
2391
2392         init_waitqueue_entry (&wait, current);
2393
2394         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2395
2396         while (!kibnal_data.kib_shutdown) {
2397                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2398                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2399                                            kib_conn_t, ibc_list);
2400                         list_del (&conn->ibc_list);
2401                         
2402                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2403
2404                         switch (conn->ibc_state) {
2405                         case IBNAL_CONN_DEATHROW:
2406                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2407                                 /* Disconnect: conn becomes a zombie in the
2408                                  * callback and last ref reschedules it
2409                                  * here... */
2410                                 kibnal_terminate_conn(conn);
2411                                 kibnal_conn_decref(conn);
2412                                 break;
2413
2414                         case IBNAL_CONN_INIT_QP:
2415                         case IBNAL_CONN_ZOMBIE:
2416                                 kibnal_destroy_conn (conn);
2417                                 break;
2418                                 
2419                         default:
2420                                 CERROR ("Bad conn %p state: %d\n",
2421                                         conn, conn->ibc_state);
2422                                 LBUG();
2423                         }
2424
2425                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2426                         continue;
2427                 }
2428
2429                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2430
2431                 /* careful with the jiffy wrap... */
2432                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2433                         const int n = 4;
2434                         const int p = 1;
2435                         int       chunk = kibnal_data.kib_peer_hash_size;
2436                         
2437                         /* Time to check for RDMA timeouts on a few more
2438                          * peers: I do checks every 'p' seconds on a
2439                          * proportion of the peer table and I need to check
2440                          * every connection 'n' times within a timeout
2441                          * interval, to ensure I detect a timeout on any
2442                          * connection within (n+1)/n times the timeout
2443                          * interval. */
2444
2445                         if (*kibnal_tunables.kib_timeout > n * p)
2446                                 chunk = (chunk * n * p) / 
2447                                         *kibnal_tunables.kib_timeout;
2448                         if (chunk == 0)
2449                                 chunk = 1;
2450
2451                         for (i = 0; i < chunk; i++) {
2452                                 kibnal_check_conns (peer_index);
2453                                 peer_index = (peer_index + 1) % 
2454                                              kibnal_data.kib_peer_hash_size;
2455                         }
2456
2457                         deadline += p * HZ;
2458                 }
2459
2460                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2461
2462                 set_current_state (TASK_INTERRUPTIBLE);
2463                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2464
2465                 schedule_timeout (timeout);
2466
2467                 set_current_state (TASK_RUNNING);
2468                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2469
2470                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2471         }
2472
2473         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2474
2475         kibnal_thread_fini ();
2476         return (0);
2477 }
2478
2479 int
2480 kibnal_connd (void *arg)
2481 {
2482         long               id = (long)arg;
2483         char               name[16];
2484         wait_queue_t       wait;
2485         unsigned long      flags;
2486         kib_peer_t        *peer;
2487         kib_acceptsock_t  *as;
2488         int                did_something;
2489
2490         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2491         cfs_daemonize(name);
2492         cfs_block_allsigs();
2493
2494         init_waitqueue_entry (&wait, current);
2495
2496         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2497
2498         while (!kibnal_data.kib_shutdown) {
2499                 did_something = 0;
2500
2501                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2502                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2503                                          kib_acceptsock_t, ibas_list);
2504                         list_del (&as->ibas_list);
2505                         
2506                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2507
2508                         kibnal_handle_svcqry(as->ibas_sock);
2509                         kibnal_free_acceptsock(as);
2510                         
2511                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2512                         did_something = 1;
2513                 }
2514                         
2515                 /* Only handle an outgoing connection request if there is someone left
2516                  * to handle an incoming svcqry */
2517                 if (!list_empty (&kibnal_data.kib_connd_peers) &&
2518                     ((kibnal_data.kib_connd_connecting + 1) < 
2519                      *kibnal_tunables.kib_n_connd)) {
2520                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2521                                            kib_peer_t, ibp_connd_list);
2522                         
2523                         list_del_init (&peer->ibp_connd_list);
2524                         kibnal_data.kib_connd_connecting++;
2525                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2526
2527                         kibnal_connect_peer (peer);
2528                         kibnal_peer_decref(peer);
2529
2530                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2531                         did_something = 1;
2532                         kibnal_data.kib_connd_connecting--;
2533                 }
2534
2535                 if (did_something)
2536                         continue;
2537
2538                 set_current_state (TASK_INTERRUPTIBLE);
2539                 add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
2540
2541                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2542
2543                 schedule();
2544
2545                 set_current_state (TASK_RUNNING);
2546                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2547
2548                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2549         }
2550
2551         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2552
2553         kibnal_thread_fini ();
2554         return (0);
2555 }
2556
2557 int
2558 kibnal_scheduler(void *arg)
2559 {
2560         long            id = (long)arg;
2561         char            name[16];
2562         kib_rx_t       *rx;
2563         kib_tx_t       *tx;
2564         unsigned long   flags;
2565         int             rc;
2566         int             counter = 0;
2567         int             did_something;
2568
2569         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2570         cfs_daemonize(name);
2571         cfs_block_allsigs();
2572
2573         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2574
2575         while (!kibnal_data.kib_shutdown) {
2576                 did_something = 0;
2577
2578                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2579                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2580                                         kib_tx_t, tx_list);
2581                         list_del(&tx->tx_list);
2582                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2583                                                flags);
2584                         kibnal_tx_done(tx);
2585
2586                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2587                                           flags);
2588                 }
2589
2590                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2591                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2592                                         kib_rx_t, rx_list);
2593                         list_del(&rx->rx_list);
2594                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2595                                                flags);
2596
2597                         kibnal_rx(rx);
2598
2599                         did_something = 1;
2600                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2601                                           flags);
2602                 }
2603
2604                 /* nothing to do or hogging CPU */
2605                 if (!did_something || counter++ == IBNAL_RESCHED) {
2606                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2607                                                flags);
2608                         counter = 0;
2609
2610                         if (!did_something) {
2611                                 rc = wait_event_interruptible_exclusive(
2612                                         kibnal_data.kib_sched_waitq,
2613                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2614                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2615                                         kibnal_data.kib_shutdown);
2616                         } else {
2617                                 our_cond_resched();
2618                         }
2619
2620                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2621                                           flags);
2622                 }
2623         }
2624
2625         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2626
2627         kibnal_thread_fini();
2628         return (0);
2629 }