Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/openiblnd/openiblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "openiblnd.h"
42
43 /*
44  *  LIB functions follow
45  *
46  */
47 void
48 kibnal_schedule_tx_done (kib_tx_t *tx)
49 {
50         unsigned long flags;
51
52         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
53
54         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
55         wake_up (&kibnal_data.kib_sched_waitq);
56
57         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
58 }
59
60 void
61 kibnal_tx_done (kib_tx_t *tx)
62 {
63         lnet_msg_t      *lntmsg[2];
64         unsigned long    flags;
65         int              i;
66         int              rc;
67
68         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
69         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
70
71         if (in_interrupt()) {
72                 /* can't deregister memory/flush FMAs/finalize in IRQ context... */
73                 kibnal_schedule_tx_done(tx);
74                 return;
75         }
76
77         switch (tx->tx_mapped) {
78         default:
79                 LBUG();
80
81         case KIB_TX_UNMAPPED:
82                 break;
83                 
84         case KIB_TX_MAPPED:
85                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
86                 LASSERT (rc == 0);
87                 tx->tx_mapped = KIB_TX_UNMAPPED;
88                 break;
89
90 #if IBNAL_FMR
91         case KIB_TX_MAPPED_FMR:
92                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
93                 LASSERT (rc == 0);
94
95 #ifndef USING_TSAPI
96                 /* Somewhat belt-and-braces since the tx's conn has closed if
97                  * this was a passive RDMA waiting to complete... */
98                 if (tx->tx_status != 0)
99                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
100 #endif
101                 tx->tx_mapped = KIB_TX_UNMAPPED;
102                 break;
103 #endif
104         }
105
106         /* tx may have up to 2 ptlmsgs to finalise */
107         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
108         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
109         rc = tx->tx_status;
110
111         if (tx->tx_conn != NULL) {
112                 kibnal_conn_decref(tx->tx_conn);
113                 tx->tx_conn = NULL;
114         }
115
116         tx->tx_nsp = 0;
117         tx->tx_passive_rdma = 0;
118         tx->tx_status = 0;
119
120         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
121
122         list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
123
124         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
125
126         /* delay finalize until my descs have been freed */
127         for (i = 0; i < 2; i++) {
128                 if (lntmsg[i] == NULL)
129                         continue;
130
131                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
132         }
133 }
134
135 kib_tx_t *
136 kibnal_get_idle_tx (void) 
137 {
138         unsigned long  flags;
139         kib_tx_t      *tx;
140         
141         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
142
143         if (list_empty (&kibnal_data.kib_idle_txs)) {
144                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
145                 return NULL;
146         }
147
148         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
149         list_del (&tx->tx_list);
150
151         /* Allocate a new passive RDMA completion cookie.  It might not be
152          * needed, but we've got a lock right now and we're unlikely to
153          * wrap... */
154         tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
155
156         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
157
158         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
159         LASSERT (tx->tx_nsp == 0);
160         LASSERT (tx->tx_sending == 0);
161         LASSERT (tx->tx_status == 0);
162         LASSERT (tx->tx_conn == NULL);
163         LASSERT (!tx->tx_passive_rdma);
164         LASSERT (!tx->tx_passive_rdma_wait);
165         LASSERT (tx->tx_lntmsg[0] == NULL);
166         LASSERT (tx->tx_lntmsg[1] == NULL);
167
168         return tx;
169 }
170
171 void
172 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
173 {
174         struct list_head *ttmp;
175         unsigned long     flags;
176         int               idle;
177
178         spin_lock_irqsave (&conn->ibc_lock, flags);
179
180         list_for_each (ttmp, &conn->ibc_active_txs) {
181                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
182
183                 LASSERT (tx->tx_passive_rdma ||
184                          !tx->tx_passive_rdma_wait);
185
186                 LASSERT (tx->tx_passive_rdma_wait ||
187                          tx->tx_sending != 0);
188
189                 if (!tx->tx_passive_rdma_wait ||
190                     tx->tx_passive_rdma_cookie != cookie)
191                         continue;
192
193                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
194
195                 /* XXX Set mlength of reply here */
196
197                 tx->tx_status = status;
198                 tx->tx_passive_rdma_wait = 0;
199                 idle = (tx->tx_sending == 0);
200
201                 if (idle)
202                         list_del (&tx->tx_list);
203
204                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
205
206                 /* I could be racing with tx callbacks.  It's whoever
207                  * _makes_ tx idle that frees it */
208                 if (idle)
209                         kibnal_tx_done (tx);
210                 return;
211         }
212                 
213         spin_unlock_irqrestore (&conn->ibc_lock, flags);
214
215         CERROR ("Unmatched (late?) RDMA completion "LPX64" from %s\n",
216                 cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
217 }
218
219 void
220 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
221 {
222         kib_conn_t   *conn = rx->rx_conn;
223         int           rc;
224         unsigned long flags;
225
226         LASSERT(!rsrvd_credit ||
227                 conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
228
229         rx->rx_gl = (struct ib_gather_scatter) {
230                 .address = rx->rx_vaddr,
231                 .length  = IBNAL_MSG_SIZE,
232                 .key     = conn->ibc_rx_pages->ibp_lkey,
233         };
234
235         rx->rx_sp = (struct ib_receive_param) {
236                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
237                 .scatter_list           = &rx->rx_gl,
238                 .num_scatter_entries    = 1,
239                 .device_specific        = NULL,
240                 .signaled               = 1,
241         };
242
243         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
244         LASSERT (rx->rx_nob >= 0);              /* not posted */
245         rx->rx_nob = -1;                        /* is now */
246         mb();
247
248         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
249                 rc = -ECONNABORTED;
250         else
251                 rc = kibnal_ib_receive(conn->ibc_qp, &rx->rx_sp);
252
253         if (rc == 0) {
254                 if (credit || rsrvd_credit) {
255                         spin_lock_irqsave(&conn->ibc_lock, flags);
256
257                         if (credit)
258                                 conn->ibc_outstanding_credits++;
259                         if (rsrvd_credit)
260                                 conn->ibc_reserved_credits++;
261                         
262                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
263
264                         kibnal_check_sends(conn);
265                 }
266                 return;
267         }
268
269         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
270                 CERROR ("Error posting receive -> %s: %d\n",
271                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
272                 kibnal_close_conn (rx->rx_conn, rc);
273         } else {
274                 CDEBUG (D_NET, "Error posting receive -> %s: %d\n",
275                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
276         }
277
278         /* Drop rx's ref */
279         kibnal_conn_decref(conn);
280 }
281
282 void
283 kibnal_rx_callback (struct ib_cq_entry *e)
284 {
285         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
286         kib_msg_t    *msg = rx->rx_msg;
287         kib_conn_t   *conn = rx->rx_conn;
288         int           credits;
289         unsigned long flags;
290         int           rc;
291         int           err = -ECONNABORTED;
292
293         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
294         LASSERT (rx->rx_nob < 0);               /* was posted */
295         rx->rx_nob = 0;                         /* isn't now */
296         mb();
297
298         /* receives complete with error in any case after we've started
299          * closing the QP */
300         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
301                 goto failed;
302
303         /* We don't post receives until the conn is established */
304         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
305
306         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
307                 CERROR("Rx from %s failed: %d\n", 
308                        libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
309                 goto failed;
310         }
311
312         LASSERT (e->bytes_transferred >= 0);
313         rx->rx_nob = e->bytes_transferred;
314         mb();
315
316         rc = kibnal_unpack_msg(msg, conn->ibc_version, rx->rx_nob);
317         if (rc != 0) {
318                 CERROR ("Error %d unpacking rx from %s\n",
319                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
320                 goto failed;
321         }
322
323         if (conn->ibc_peer->ibp_nid != msg->ibm_srcnid ||
324             kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid ||
325             msg->ibm_srcstamp != conn->ibc_incarnation ||
326             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
327                 CERROR ("Stale rx from %s\n",
328                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
329                 err = -ESTALE;
330                 goto failed;
331         }
332
333         /* Have I received credits that will let me send? */
334         credits = msg->ibm_credits;
335         if (credits != 0) {
336                 spin_lock_irqsave(&conn->ibc_lock, flags);
337                 conn->ibc_credits += credits;
338                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
339                 
340                 kibnal_check_sends(conn);
341         }
342
343         switch (msg->ibm_type) {
344         case IBNAL_MSG_NOOP:
345                 kibnal_post_rx (rx, 1, 0);
346                 return;
347
348         case IBNAL_MSG_IMMEDIATE:
349                 break;
350                 
351         case IBNAL_MSG_PUT_RDMA:
352         case IBNAL_MSG_GET_RDMA:
353                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
354                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
355                        msg->ibm_u.rdma.ibrm_desc.rd_key,
356                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
357                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
358                 break;
359                 
360         case IBNAL_MSG_PUT_DONE:
361         case IBNAL_MSG_GET_DONE:
362                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
363                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
364                        msg->ibm_u.completion.ibcm_status);
365
366                 kibnal_complete_passive_rdma (conn, 
367                                               msg->ibm_u.completion.ibcm_cookie,
368                                               msg->ibm_u.completion.ibcm_status);
369
370                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
371                         kibnal_post_rx (rx, 1, 0);
372                 } else {
373                         /* this reply buffer was pre-reserved */
374                         kibnal_post_rx (rx, 0, 1);
375                 }
376                 return;
377                         
378         default:
379                 CERROR ("Bad msg type %x from %s\n",
380                         msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
381                 goto failed;
382         }
383
384         kibnal_peer_alive(conn->ibc_peer);
385
386         /* schedule for kibnal_rx() in thread context */
387         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
388         
389         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
390         wake_up (&kibnal_data.kib_sched_waitq);
391         
392         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
393         return;
394         
395  failed:
396         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
397         kibnal_close_conn(conn, err);
398
399         /* Don't re-post rx & drop its ref on conn */
400         kibnal_conn_decref(conn);
401 }
402
403 void
404 kibnal_rx (kib_rx_t *rx)
405 {
406         int          rc = 0;
407         kib_msg_t   *msg = rx->rx_msg;
408
409         switch (msg->ibm_type) {
410         case IBNAL_MSG_GET_RDMA:
411                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
412                                 msg->ibm_srcnid, rx, 1);
413                 break;
414                 
415         case IBNAL_MSG_PUT_RDMA:
416                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
417                                 msg->ibm_srcnid, rx, 1);
418                 break;
419
420         case IBNAL_MSG_IMMEDIATE:
421                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
422                                 msg->ibm_srcnid, rx, 0);
423                 break;
424
425         default:
426                 LBUG();
427                 break;
428         }
429
430         if (rc < 0) {
431                 kibnal_close_conn(rx->rx_conn, rc);
432                 kibnal_post_rx (rx, 1, 0);
433         }
434 }
435
436 #if 0
437 int
438 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
439 {
440         struct page *page;
441
442         if (vaddr >= VMALLOC_START &&
443             vaddr < VMALLOC_END)
444                 page = vmalloc_to_page ((void *)vaddr);
445 #ifdef CONFIG_HIGHMEM
446         else if (vaddr >= PKMAP_BASE &&
447                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
448                 page = vmalloc_to_page ((void *)vaddr);
449         /* in 2.4 ^ just walks the page tables */
450 #endif
451         else
452                 page = virt_to_page (vaddr);
453
454         if (page == NULL ||
455             !VALID_PAGE (page))
456                 return (-EFAULT);
457
458         *physp = lnet_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
459         return (0);
460 }
461 #endif
462
463 int
464 kibnal_map_iov (kib_tx_t *tx, int access,
465                 unsigned int niov, struct iovec *iov, int offset, int nob)
466                  
467 {
468         void   *vaddr;
469         int     rc;
470
471         LASSERT (nob > 0);
472         LASSERT (niov > 0);
473         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
474
475         while (offset >= iov->iov_len) {
476                 offset -= iov->iov_len;
477                 niov--;
478                 iov++;
479                 LASSERT (niov > 0);
480         }
481
482         if (nob > iov->iov_len - offset) {
483                 CERROR ("Can't map multiple vaddr fragments\n");
484                 return (-EMSGSIZE);
485         }
486
487         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
488         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
489
490         rc = ib_memory_register (kibnal_data.kib_pd,
491                                  vaddr, nob,
492                                  access,
493                                  &tx->tx_md.md_handle.mr,
494                                  &tx->tx_md.md_lkey,
495                                  &tx->tx_md.md_rkey);
496         
497         if (rc != 0) {
498                 CERROR ("Can't map vaddr: %d\n", rc);
499                 return (rc);
500         }
501
502         tx->tx_mapped = KIB_TX_MAPPED;
503         return (0);
504 }
505
506 int
507 kibnal_map_kiov (kib_tx_t *tx, int access,
508                   int nkiov, lnet_kiov_t *kiov,
509                   int offset, int nob)
510 {
511 #if IBNAL_FMR
512         __u64                      *phys;
513         const int                   mapped = KIB_TX_MAPPED_FMR;
514 #else
515         struct ib_physical_buffer  *phys;
516         const int                   mapped = KIB_TX_MAPPED;
517 #endif
518         int                         page_offset;
519         int                         nphys;
520         int                         resid;
521         int                         phys_size;
522         int                         rc;
523
524         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
525
526         LASSERT (nob > 0);
527         LASSERT (nkiov > 0);
528         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
529
530         while (offset >= kiov->kiov_len) {
531                 offset -= kiov->kiov_len;
532                 nkiov--;
533                 kiov++;
534                 LASSERT (nkiov > 0);
535         }
536
537         phys_size = nkiov * sizeof (*phys);
538         LIBCFS_ALLOC(phys, phys_size);
539         if (phys == NULL) {
540                 CERROR ("Can't allocate tmp phys\n");
541                 return (-ENOMEM);
542         }
543
544         page_offset = kiov->kiov_offset + offset;
545 #if IBNAL_FMR
546         phys[0] = lnet_page2phys(kiov->kiov_page);
547 #else
548         phys[0].address = lnet_page2phys(kiov->kiov_page);
549         phys[0].size = PAGE_SIZE;
550 #endif
551         nphys = 1;
552         resid = nob - (kiov->kiov_len - offset);
553
554         while (resid > 0) {
555                 kiov++;
556                 nkiov--;
557                 LASSERT (nkiov > 0);
558
559                 if (kiov->kiov_offset != 0 ||
560                     ((resid > PAGE_SIZE) && 
561                      kiov->kiov_len < PAGE_SIZE)) {
562                         int i;
563                         /* Can't have gaps */
564                         CERROR ("Can't make payload contiguous in I/O VM:"
565                                 "page %d, offset %d, len %d \n", nphys, 
566                                 kiov->kiov_offset, kiov->kiov_len);
567
568                         for (i = -nphys; i < nkiov; i++) 
569                         {
570                                 CERROR("kiov[%d] %p +%d for %d\n",
571                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
572                         }
573                         
574                         rc = -EINVAL;
575                         goto out;
576                 }
577
578                 if (nphys == LNET_MAX_IOV) {
579                         CERROR ("payload too big (%d)\n", nphys);
580                         rc = -EMSGSIZE;
581                         goto out;
582                 }
583
584                 LASSERT (nphys * sizeof (*phys) < phys_size);
585 #if IBNAL_FMR
586                 phys[nphys] = lnet_page2phys(kiov->kiov_page);
587 #else
588                 phys[nphys].address = lnet_page2phys(kiov->kiov_page);
589                 phys[nphys].size = PAGE_SIZE;
590 #endif
591                 nphys++;
592
593                 resid -= PAGE_SIZE;
594         }
595
596         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
597
598 #if IBNAL_FMR
599         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
600                                        phys, nphys,
601                                        &tx->tx_md.md_addr,
602                                        page_offset,
603                                        &tx->tx_md.md_handle.fmr,
604                                        &tx->tx_md.md_lkey,
605                                        &tx->tx_md.md_rkey);
606 #else
607         rc = ib_memory_register_physical (kibnal_data.kib_pd,
608                                           phys, nphys,
609                                           &tx->tx_md.md_addr,
610                                           nob, page_offset,
611                                           access,
612                                           &tx->tx_md.md_handle.mr,
613                                           &tx->tx_md.md_lkey,
614                                           &tx->tx_md.md_rkey);
615 #endif
616         if (rc == 0) {
617                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
618                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
619                 tx->tx_mapped = mapped;
620         } else {
621                 CERROR ("Can't map phys: %d\n", rc);
622                 rc = -EFAULT;
623         }
624
625  out:
626         LIBCFS_FREE(phys, phys_size);
627         return (rc);
628 }
629
630 kib_conn_t *
631 kibnal_find_conn_locked (kib_peer_t *peer)
632 {
633         struct list_head *tmp;
634
635         /* just return the first connection */
636         list_for_each (tmp, &peer->ibp_conns) {
637                 return (list_entry(tmp, kib_conn_t, ibc_list));
638         }
639
640         return (NULL);
641 }
642
643 void
644 kibnal_check_sends (kib_conn_t *conn)
645 {
646         unsigned long   flags;
647         kib_tx_t       *tx;
648         int             rc;
649         int             i;
650         int             consume_credit;
651         int             done;
652         int             nwork;
653
654         spin_lock_irqsave (&conn->ibc_lock, flags);
655
656         LASSERT (conn->ibc_nsends_posted <= IBNAL_RX_MSGS);
657         LASSERT (conn->ibc_reserved_credits >= 0);
658
659         while (conn->ibc_reserved_credits > 0 &&
660                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
661                 LASSERT (conn->ibc_version !=
662                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
663                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
664                                 kib_tx_t, tx_list);
665                 list_del(&tx->tx_list);
666                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
667                 conn->ibc_reserved_credits--;
668         }
669
670         if (list_empty(&conn->ibc_tx_queue) &&
671             list_empty(&conn->ibc_tx_queue_nocred) &&
672             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
673              kibnal_send_keepalive(conn))) {
674                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
675                 
676                 tx = kibnal_get_idle_tx();
677                 if (tx != NULL)
678                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
679
680                 spin_lock_irqsave(&conn->ibc_lock, flags);
681                 
682                 if (tx != NULL)
683                         kibnal_queue_tx_locked(tx, conn);
684         }
685
686         for (;;) {
687                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
688                         LASSERT (conn->ibc_version !=
689                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
690                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
691                                         kib_tx_t, tx_list);
692                         consume_credit = 0;
693                 } else if (!list_empty (&conn->ibc_tx_queue)) {
694                         tx = list_entry (conn->ibc_tx_queue.next, 
695                                          kib_tx_t, tx_list);
696                         consume_credit = 1;
697                 } else {
698                         /* nothing waiting */
699                         break;
700                 }
701
702                 /* We rely on this for QP sizing */
703                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
704
705                 LASSERT (conn->ibc_outstanding_credits >= 0);
706                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
707                 LASSERT (conn->ibc_credits >= 0);
708                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
709
710                 /* Not on ibc_rdma_queue */
711                 LASSERT (!tx->tx_passive_rdma_wait);
712
713                 if (conn->ibc_nsends_posted == IBNAL_RX_MSGS)
714                         break;
715
716                 if (consume_credit) {
717                         if (conn->ibc_credits == 0)     /* no credits */
718                                 break;
719                 
720                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
721                             conn->ibc_outstanding_credits == 0) /* giving back credits */
722                                 break;
723                 }
724                 
725                 list_del (&tx->tx_list);
726
727                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
728                     (!list_empty(&conn->ibc_tx_queue) ||
729                      !list_empty(&conn->ibc_tx_queue_nocred) ||
730                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
731                       !kibnal_send_keepalive(conn)))) {
732                         /* redundant NOOP */
733                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
734                         kibnal_tx_done(tx);
735                         spin_lock_irqsave(&conn->ibc_lock, flags);
736                         continue;
737                 }
738
739                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
740                                 conn->ibc_outstanding_credits,
741                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
742
743                 conn->ibc_outstanding_credits = 0;
744                 conn->ibc_nsends_posted++;
745                 if (consume_credit)
746                         conn->ibc_credits--;
747
748                 tx->tx_sending = tx->tx_nsp;
749                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
750                 list_add (&tx->tx_list, &conn->ibc_active_txs);
751
752                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
753
754                 /* NB the gap between removing tx from the queue and sending it
755                  * allows message re-ordering to occur */
756
757                 LASSERT (tx->tx_nsp > 0);
758
759                 rc = -ECONNABORTED;
760                 nwork = 0;
761                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
762                         tx->tx_status = 0;
763                         /* Driver only accepts 1 item at a time */
764                         for (i = 0; i < tx->tx_nsp; i++) {
765                                 rc = kibnal_ib_send(conn->ibc_qp, &tx->tx_sp[i]);
766                                 if (rc != 0)
767                                         break;
768                                 nwork++;
769                         }
770                 }
771
772                 conn->ibc_last_send = jiffies;
773
774                 spin_lock_irqsave (&conn->ibc_lock, flags);
775                 if (rc != 0) {
776                         /* NB credits are transferred in the actual
777                          * message, which can only be the last work item */
778                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
779                         if (consume_credit)
780                                 conn->ibc_credits++;
781                         conn->ibc_nsends_posted--;
782
783                         tx->tx_status = rc;
784                         tx->tx_passive_rdma_wait = 0;
785                         tx->tx_sending -= tx->tx_nsp - nwork;
786
787                         done = (tx->tx_sending == 0);
788                         if (done)
789                                 list_del (&tx->tx_list);
790                         
791                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
792                         
793                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
794                                 CERROR ("Error %d posting transmit to %s\n", 
795                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
796                         else
797                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
798                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
799
800                         kibnal_close_conn (conn, rc);
801
802                         if (done)
803                                 kibnal_tx_done (tx);
804                         return;
805                 }
806                 
807         }
808
809         spin_unlock_irqrestore (&conn->ibc_lock, flags);
810 }
811
812 void
813 kibnal_tx_callback (struct ib_cq_entry *e)
814 {
815         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
816         kib_conn_t   *conn;
817         unsigned long flags;
818         int           idle;
819
820         conn = tx->tx_conn;
821         LASSERT (conn != NULL);
822         LASSERT (tx->tx_sending != 0);
823
824         spin_lock_irqsave(&conn->ibc_lock, flags);
825
826         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
827                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
828                e->status);
829
830         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
831          * gets to free it, which also drops its ref on 'conn'.  If it's
832          * not me, then I take an extra ref on conn so it can't disappear
833          * under me. */
834
835         tx->tx_sending--;
836         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
837                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
838         if (idle)
839                 list_del(&tx->tx_list);
840
841         kibnal_conn_addref(conn);
842
843         if (tx->tx_sending == 0)
844                 conn->ibc_nsends_posted--;
845
846         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
847             tx->tx_status == 0)
848                 tx->tx_status = -ECONNABORTED;
849                 
850         spin_unlock_irqrestore(&conn->ibc_lock, flags);
851
852         if (idle)
853                 kibnal_tx_done (tx);
854
855         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
856                 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", 
857                         libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
858                 kibnal_close_conn (conn, -ENETDOWN);
859         } else {
860                 kibnal_peer_alive(conn->ibc_peer);
861                 /* can I shovel some more sends out the door? */
862                 kibnal_check_sends(conn);
863         }
864
865         kibnal_conn_decref(conn);
866 }
867
868 void
869 kibnal_callback (ib_cq_t *cq, struct ib_cq_entry *e, void *arg)
870 {
871         if (kibnal_wreqid_is_rx(e->work_request_id))
872                 kibnal_rx_callback (e);
873         else
874                 kibnal_tx_callback (e);
875 }
876
877 void
878 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
879 {
880         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
881         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
882         int                       fence;
883         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
884
885         LASSERT (tx->tx_nsp >= 0 && 
886                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
887         LASSERT (nob <= IBNAL_MSG_SIZE);
888
889         kibnal_init_msg(tx->tx_msg, type, body_nob);
890
891         /* Fence the message if it's bundled with an RDMA read */
892         fence = (tx->tx_nsp > 0) &&
893                 (type == IBNAL_MSG_PUT_DONE);
894
895         *gl = (struct ib_gather_scatter) {
896                 .address = tx->tx_vaddr,
897                 .length  = nob,
898                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
899         };
900
901         /* NB If this is an RDMA read, the completion message must wait for
902          * the RDMA to complete.  Sends wait for previous RDMA writes
903          * anyway... */
904         *sp = (struct ib_send_param) {
905                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
906                 .op                   = IB_OP_SEND,
907                 .gather_list          = gl,
908                 .num_gather_entries   = 1,
909                 .device_specific      = NULL,
910                 .solicited_event      = 1,
911                 .signaled             = 1,
912                 .immediate_data_valid = 0,
913                 .fence                = fence,
914                 .inline_data          = 0,
915         };
916
917         tx->tx_nsp++;
918 }
919
920 void
921 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
922 {
923         unsigned long         flags;
924
925         spin_lock_irqsave(&conn->ibc_lock, flags);
926
927         kibnal_queue_tx_locked (tx, conn);
928         
929         spin_unlock_irqrestore(&conn->ibc_lock, flags);
930         
931         kibnal_check_sends(conn);
932 }
933
934 void
935 kibnal_schedule_active_connect_locked (kib_peer_t *peer)
936 {
937         /* Called with exclusive kib_global_lock */
938
939         peer->ibp_connecting++;
940         kibnal_peer_addref(peer); /* extra ref for connd */
941         
942         spin_lock (&kibnal_data.kib_connd_lock);
943         
944         LASSERT (list_empty(&peer->ibp_connd_list));
945         list_add_tail (&peer->ibp_connd_list,
946                        &kibnal_data.kib_connd_peers);
947         wake_up (&kibnal_data.kib_connd_waitq);
948         
949         spin_unlock (&kibnal_data.kib_connd_lock);
950 }
951
952 void
953 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
954 {
955         unsigned long    flags;
956         kib_peer_t      *peer;
957         kib_conn_t      *conn;
958         int              retry;
959         int              rc;
960         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
961
962         /* If I get here, I've committed to send, so I complete the tx with
963          * failure on any problems */
964         
965         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
966         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
967
968         for (retry = 0; ; retry = 1) {
969                 read_lock_irqsave(g_lock, flags);
970         
971                 peer = kibnal_find_peer_locked (nid);
972                 if (peer != NULL) {
973                         conn = kibnal_find_conn_locked (peer);
974                         if (conn != NULL) {
975                                 kibnal_conn_addref(conn); /* 1 ref for me...*/
976                                 read_unlock_irqrestore(g_lock, flags);
977                 
978                                 kibnal_queue_tx (tx, conn);
979                                 kibnal_conn_decref(conn); /* ...until here */
980                                 return;
981                         }
982                 }
983                 
984                 /* Making one or more connections; I'll need a write lock... */
985                 read_unlock(g_lock);
986                 write_lock(g_lock);
987
988                 peer = kibnal_find_peer_locked (nid);
989                 if (peer != NULL)
990                         break;
991                 
992                 write_unlock_irqrestore (g_lock, flags);
993
994                 if (retry) {
995                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
996                         tx->tx_status = -EHOSTUNREACH;
997                         kibnal_tx_done (tx);
998                         return;
999                 }
1000
1001                 rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid),
1002                                                 lnet_acceptor_port());
1003                 if (rc != 0) {
1004                         CERROR("Can't add peer %s: %d\n",
1005                                libcfs_nid2str(nid), rc);
1006                         tx->tx_status = rc;
1007                         kibnal_tx_done(tx);
1008                         return;
1009                 }
1010         }
1011
1012         conn = kibnal_find_conn_locked (peer);
1013         if (conn != NULL) {
1014                 /* Connection exists; queue message on it */
1015                 kibnal_conn_addref(conn);       /* +1 ref from me... */
1016                 write_unlock_irqrestore (g_lock, flags);
1017                 
1018                 kibnal_queue_tx (tx, conn);
1019                 kibnal_conn_decref(conn);       /* ...until here */
1020                 return;
1021         }
1022
1023         if (peer->ibp_connecting == 0 &&
1024             peer->ibp_accepting == 0) {
1025                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1026                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1027                         write_unlock_irqrestore (g_lock, flags);
1028                         tx->tx_status = -EHOSTUNREACH;
1029                         kibnal_tx_done (tx);
1030                         return;
1031                 }
1032         
1033                 kibnal_schedule_active_connect_locked(peer);
1034         }
1035         
1036         /* A connection is being established; queue the message... */
1037         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1038
1039         write_unlock_irqrestore (g_lock, flags);
1040 }
1041
1042 void
1043 kibnal_txlist_done (struct list_head *txlist, int status)
1044 {
1045         kib_tx_t *tx;
1046
1047         while (!list_empty(txlist)) {
1048                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1049
1050                 list_del (&tx->tx_list);
1051                 /* complete now */
1052                 tx->tx_status = status;
1053                 kibnal_tx_done (tx);
1054         }
1055 }
1056
1057 int
1058 kibnal_start_passive_rdma (int type, lnet_msg_t *lntmsg,
1059                            int niov, struct iovec *iov, lnet_kiov_t *kiov,
1060                            int nob)
1061 {
1062         lnet_nid_t  nid = lntmsg->msg_target.nid;
1063         kib_tx_t   *tx;
1064         kib_msg_t  *ibmsg;
1065         int         rc;
1066         int         access;
1067         
1068         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1069                  type == IBNAL_MSG_GET_RDMA);
1070         LASSERT (nob > 0);
1071         LASSERT (!in_interrupt());              /* Mapping could block */
1072
1073         if (type == IBNAL_MSG_PUT_RDMA) {
1074                 access = IB_ACCESS_REMOTE_READ;
1075         } else {
1076                 access = IB_ACCESS_REMOTE_WRITE |
1077                          IB_ACCESS_LOCAL_WRITE;
1078         }
1079
1080         tx = kibnal_get_idle_tx ();
1081         if (tx == NULL) {
1082                 CERROR("Can't allocate %s txd for %s\n",
1083                        (type == IBNAL_MSG_PUT_RDMA) ? "PUT/REPLY" : "GET",
1084                        libcfs_nid2str(nid));
1085                 return -ENOMEM;
1086         }
1087
1088         
1089         if (iov != NULL) 
1090                 rc = kibnal_map_iov (tx, access, niov, iov, 0, nob);
1091         else
1092                 rc = kibnal_map_kiov (tx, access, niov, kiov, 0, nob);
1093
1094         if (rc != 0) {
1095                 CERROR ("Can't map RDMA for %s: %d\n", 
1096                         libcfs_nid2str(nid), rc);
1097                 goto failed;
1098         }
1099         
1100         if (type == IBNAL_MSG_GET_RDMA) {
1101                 /* reply gets finalized when tx completes */
1102                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni, 
1103                                                          lntmsg);
1104                 if (tx->tx_lntmsg[1] == NULL) {
1105                         CERROR ("Can't create reply for GET -> %s\n",
1106                                 libcfs_nid2str(nid));
1107                         rc = -ENOMEM;
1108                         goto failed;
1109                 }
1110         }
1111         
1112         tx->tx_passive_rdma = 1;
1113
1114         ibmsg = tx->tx_msg;
1115
1116         ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr;
1117         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1118         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1119         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1120         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1121
1122         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1123
1124         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1125                LPX64", nob %d\n",
1126                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1127                tx->tx_md.md_addr, nob);
1128         
1129         /* lntmsg gets finalized when tx completes. */
1130         tx->tx_lntmsg[0] = lntmsg;
1131
1132         kibnal_launch_tx(tx, nid);
1133         return (0);
1134
1135  failed:
1136         tx->tx_status = rc;
1137         kibnal_tx_done (tx);
1138         return (-EIO);
1139 }
1140
1141 void
1142 kibnal_start_active_rdma (int type, int status,
1143                           kib_rx_t *rx, lnet_msg_t *lntmsg, 
1144                           unsigned int niov,
1145                           struct iovec *iov, lnet_kiov_t *kiov,
1146                           int offset, int nob)
1147 {
1148         kib_msg_t    *rxmsg = rx->rx_msg;
1149         kib_msg_t    *txmsg;
1150         kib_tx_t     *tx;
1151         int           access;
1152         int           rdma_op;
1153         int           rc;
1154
1155         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1156                type, status, niov, offset, nob);
1157
1158         /* Called by scheduler */
1159         LASSERT (!in_interrupt ());
1160
1161         /* Either all pages or all vaddrs */
1162         LASSERT (!(kiov != NULL && iov != NULL));
1163
1164         /* No data if we're completing with failure */
1165         LASSERT (status == 0 || nob == 0);
1166
1167         LASSERT (type == IBNAL_MSG_GET_DONE ||
1168                  type == IBNAL_MSG_PUT_DONE);
1169
1170         if (type == IBNAL_MSG_GET_DONE) {
1171                 access   = 0;
1172                 rdma_op  = IB_OP_RDMA_WRITE;
1173                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1174         } else {
1175                 access   = IB_ACCESS_LOCAL_WRITE;
1176                 rdma_op  = IB_OP_RDMA_READ;
1177                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1178         }
1179
1180         tx = kibnal_get_idle_tx ();
1181         if (tx == NULL) {
1182                 CERROR ("tx descs exhausted on RDMA from %s"
1183                         " completing locally with failure\n",
1184                         libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid));
1185                 lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM);
1186                 return;
1187         }
1188         LASSERT (tx->tx_nsp == 0);
1189                         
1190         if (nob != 0) {
1191                 /* We actually need to transfer some data (the transfer
1192                  * size could get truncated to zero when the incoming
1193                  * message is matched) */
1194
1195                 if (kiov != NULL)
1196                         rc = kibnal_map_kiov (tx, access,
1197                                               niov, kiov, offset, nob);
1198                 else
1199                         rc = kibnal_map_iov (tx, access,
1200                                              niov, iov, offset, nob);
1201                 
1202                 if (rc != 0) {
1203                         CERROR ("Can't map RDMA -> %s: %d\n", 
1204                                 libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid), 
1205                                 rc);
1206                         /* We'll skip the RDMA and complete with failure. */
1207                         status = rc;
1208                         nob = 0;
1209                 } else {
1210                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1211                                 .address = tx->tx_md.md_addr,
1212                                 .length  = nob,
1213                                 .key     = tx->tx_md.md_lkey,
1214                         };
1215                 
1216                         tx->tx_sp[0] = (struct ib_send_param) {
1217                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1218                                 .op                   = rdma_op,
1219                                 .gather_list          = &tx->tx_gl[0],
1220                                 .num_gather_entries   = 1,
1221                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1222                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1223                                 .device_specific      = NULL,
1224                                 .solicited_event      = 0,
1225                                 .signaled             = 1,
1226                                 .immediate_data_valid = 0,
1227                                 .fence                = 0,
1228                                 .inline_data          = 0,
1229                         };
1230
1231                         tx->tx_nsp = 1;
1232                 }
1233         }
1234
1235         txmsg = tx->tx_msg;
1236
1237         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1238         txmsg->ibm_u.completion.ibcm_status = status;
1239         
1240         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1241
1242         if (status == 0 && nob != 0) {
1243                 LASSERT (tx->tx_nsp > 1);
1244                 /* RDMA: lntmsg gets finalized when the tx completes.  This
1245                  * is after the completion message has been sent, which in
1246                  * turn is after the RDMA has finished. */
1247                 tx->tx_lntmsg[0] = lntmsg;
1248         } else {
1249                 LASSERT (tx->tx_nsp == 1);
1250                 /* No RDMA: local completion happens now! */
1251                 CDEBUG(D_NET, "No data: immediate completion\n");
1252                 lnet_finalize (kibnal_data.kib_ni, lntmsg,
1253                               status == 0 ? 0 : -EIO);
1254         }
1255
1256         kibnal_queue_tx(tx, rx->rx_conn);
1257 }
1258
1259 int
1260 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1261 {
1262         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1263         int               type = lntmsg->msg_type; 
1264         lnet_process_id_t target = lntmsg->msg_target;
1265         int               target_is_router = lntmsg->msg_target_is_router;
1266         int               routing = lntmsg->msg_routing;
1267         unsigned int      payload_niov = lntmsg->msg_niov; 
1268         struct iovec     *payload_iov = lntmsg->msg_iov; 
1269         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1270         unsigned int      payload_offset = lntmsg->msg_offset;
1271         unsigned int      payload_nob = lntmsg->msg_len;
1272         kib_msg_t        *ibmsg;
1273         kib_tx_t         *tx;
1274         int               nob;
1275
1276         /* NB 'private' is different depending on what we're sending.... */
1277
1278         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1279                payload_nob, payload_niov, libcfs_id2str(target));
1280
1281         LASSERT (payload_nob == 0 || payload_niov > 0);
1282         LASSERT (payload_niov <= LNET_MAX_IOV);
1283
1284         /* Thread context if we're sending payload */
1285         LASSERT (!in_interrupt() || payload_niov == 0);
1286         /* payload is either all vaddrs or all pages */
1287         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1288
1289         switch (type) {
1290         default:
1291                 LBUG();
1292                 return (-EIO);
1293                 
1294         case LNET_MSG_ACK:
1295                 LASSERT (payload_nob == 0);
1296                 break;
1297
1298         case LNET_MSG_GET:
1299                 if (routing || target_is_router)
1300                         break;                  /* send IMMEDIATE */
1301
1302                 /* is the REPLY message too small for RDMA? */
1303                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1304                 if (nob <= IBNAL_MSG_SIZE)
1305                         break;                  /* send IMMEDIATE */
1306
1307                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1308                         return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1309                                                          lntmsg->msg_md->md_niov, 
1310                                                          lntmsg->msg_md->md_iov.iov, NULL,
1311                                                          lntmsg->msg_md->md_length);
1312
1313                 return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1314                                                  lntmsg->msg_md->md_niov, 
1315                                                  NULL, lntmsg->msg_md->md_iov.kiov,
1316                                                  lntmsg->msg_md->md_length);
1317
1318         case LNET_MSG_REPLY:
1319         case LNET_MSG_PUT:
1320                 /* Is the payload small enough not to need RDMA? */
1321                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1322                 if (nob <= IBNAL_MSG_SIZE)
1323                         break;                  /* send IMMEDIATE */
1324                 
1325                 return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg,
1326                                                  payload_niov,
1327                                                  payload_iov, payload_kiov,
1328                                                  payload_nob);
1329         }
1330
1331         /* Send IMMEDIATE */
1332
1333         tx = kibnal_get_idle_tx();
1334         if (tx == NULL) {
1335                 CERROR ("Can't send %d to %s: tx descs exhausted%s\n", 
1336                         type, libcfs_nid2str(target.nid), 
1337                         in_interrupt() ? " (intr)" : "");
1338                 return (-ENOMEM);
1339         }
1340
1341         ibmsg = tx->tx_msg;
1342         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1343
1344         if (payload_kiov != NULL)
1345                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1346                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1347                                     payload_niov, payload_kiov, 
1348                                     payload_offset, payload_nob);
1349         else
1350                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1351                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1352                                    payload_niov, payload_iov, 
1353                                    payload_offset, payload_nob);
1354
1355         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1356                             offsetof(kib_immediate_msg_t, 
1357                                      ibim_payload[payload_nob]));
1358
1359         /* lntmsg gets finalized when tx completes */
1360         tx->tx_lntmsg[0] = lntmsg;
1361
1362         kibnal_launch_tx(tx, target.nid);
1363         return (0);
1364 }
1365
1366 int
1367 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1368                    void **new_private)
1369 {
1370         kib_rx_t    *rx = private;
1371         kib_conn_t  *conn = rx->rx_conn;
1372
1373         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1374                 /* Can't block if RDMA completions need normal credits */
1375                 LCONSOLE_ERROR_MSG(0x12a, 
1376                                "Dropping message from %s: no buffers free. "
1377                                "%s is running an old version of LNET that may "
1378                                "deadlock if messages wait for buffers)\n",
1379                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1380                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1381                 return -EDEADLK;
1382         }
1383         
1384         *new_private = private;
1385         return 0;
1386 }
1387
1388 int
1389 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1390              int delayed, unsigned int niov,
1391              struct iovec *iov, lnet_kiov_t *kiov,
1392              unsigned int offset, unsigned int mlen, unsigned int rlen)
1393 {
1394         kib_rx_t    *rx = private;
1395         kib_msg_t   *rxmsg = rx->rx_msg;
1396         int          msg_nob;
1397         int          rc = 0;
1398         
1399         LASSERT (mlen <= rlen);
1400         LASSERT (!in_interrupt ());
1401         /* Either all pages or all vaddrs */
1402         LASSERT (!(kiov != NULL && iov != NULL));
1403
1404         switch (rxmsg->ibm_type) {
1405         default:
1406                 LBUG();
1407
1408         case IBNAL_MSG_IMMEDIATE:
1409                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1410                 if (msg_nob > rx->rx_nob) {
1411                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1412                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1413                                 msg_nob, rx->rx_nob);
1414                         rc = -EPROTO;
1415                         break;
1416                 }
1417
1418                 if (kiov != NULL)
1419                         lnet_copy_flat2kiov(
1420                                 niov, kiov, offset, 
1421                                 IBNAL_MSG_SIZE, rxmsg,
1422                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1423                                 mlen);
1424                 else
1425                         lnet_copy_flat2iov(
1426                                 niov, iov, offset,
1427                                 IBNAL_MSG_SIZE, rxmsg,
1428                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1429                                 mlen);
1430
1431                 lnet_finalize (ni, lntmsg, 0);
1432                 break;
1433
1434         case IBNAL_MSG_GET_RDMA:
1435                 if (lntmsg != NULL) {
1436                         /* GET matched: RDMA lntmsg's payload */
1437                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1438                                                  rx, lntmsg, 
1439                                                  lntmsg->msg_niov, 
1440                                                  lntmsg->msg_iov, 
1441                                                  lntmsg->msg_kiov,
1442                                                  lntmsg->msg_offset, 
1443                                                  lntmsg->msg_len);
1444                 } else {
1445                         /* GET didn't match anything */
1446                         kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
1447                                                   rx, NULL, 0, NULL, NULL, 0, 0);
1448                 }
1449                 break;
1450
1451         case IBNAL_MSG_PUT_RDMA:
1452                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg,
1453                                           niov, iov, kiov, offset, mlen);
1454                 break;
1455         }
1456
1457         kibnal_post_rx(rx, 1, 0);
1458         return rc;
1459 }
1460
1461 int
1462 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1463 {
1464         long    pid = kernel_thread (fn, arg, 0);
1465
1466         if (pid < 0)
1467                 return ((int)pid);
1468
1469         atomic_inc (&kibnal_data.kib_nthreads);
1470         return (0);
1471 }
1472
1473 void
1474 kibnal_thread_fini (void)
1475 {
1476         atomic_dec (&kibnal_data.kib_nthreads);
1477 }
1478
1479 void
1480 kibnal_peer_alive (kib_peer_t *peer)
1481 {
1482         /* This is racy, but everyone's only writing cfs_time_current() */
1483         peer->ibp_last_alive = cfs_time_current();
1484         mb();
1485 }
1486
1487 void
1488 kibnal_peer_notify (kib_peer_t *peer)
1489 {
1490         time_t        last_alive = 0;
1491         int           error = 0;
1492         unsigned long flags;
1493         
1494         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1495
1496         if (list_empty(&peer->ibp_conns) &&
1497             peer->ibp_accepting == 0 &&
1498             peer->ibp_connecting == 0 &&
1499             peer->ibp_error != 0) {
1500                 error = peer->ibp_error;
1501                 peer->ibp_error = 0;
1502                 last_alive = cfs_time_current_sec() -
1503                              cfs_duration_sec(cfs_time_current() -
1504                                               peer->ibp_last_alive);
1505         }
1506         
1507         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1508         
1509         if (error != 0)
1510                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1511 }
1512
1513 void
1514 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1515 {
1516         /* This just does the immmediate housekeeping, and schedules the
1517          * connection for the reaper to finish off.
1518          * Caller holds kib_global_lock exclusively in irq context */
1519         kib_peer_t   *peer = conn->ibc_peer;
1520
1521         CDEBUG (error == 0 ? D_NET : D_NETERROR,
1522                 "closing conn to %s: error %d\n", 
1523                 libcfs_nid2str(peer->ibp_nid), error);
1524         
1525         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1526                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1527
1528         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1529                 /* kib_reaper_conns takes ibc_list's ref */
1530                 list_del (&conn->ibc_list);
1531         } else {
1532                 /* new ref for kib_reaper_conns */
1533                 kibnal_conn_addref(conn);
1534         }
1535         
1536         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1537                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1538                     kibnal_peer_active(peer))     /* still in peer table */
1539                         kibnal_unlink_peer_locked (peer);
1540
1541                 peer->ibp_error = error; /* set/clear error on last conn */
1542         }
1543
1544         conn->ibc_state = IBNAL_CONN_DEATHROW;
1545
1546         /* Schedule conn for closing/destruction */
1547         spin_lock (&kibnal_data.kib_reaper_lock);
1548
1549         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1550         wake_up (&kibnal_data.kib_reaper_waitq);
1551                 
1552         spin_unlock (&kibnal_data.kib_reaper_lock);
1553 }
1554
1555 int
1556 kibnal_close_conn (kib_conn_t *conn, int why)
1557 {
1558         unsigned long     flags;
1559         int               count = 0;
1560
1561         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1562
1563         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1564         
1565         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1566                 count = 1;
1567                 kibnal_close_conn_locked (conn, why);
1568         }
1569         
1570         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1571         return (count);
1572 }
1573
1574 void
1575 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
1576 {
1577         LIST_HEAD        (zombies);
1578         unsigned long     flags;
1579
1580         LASSERT(error != 0);
1581
1582         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1583
1584         if (active) {
1585                 LASSERT (peer->ibp_connecting != 0);
1586                 peer->ibp_connecting--;
1587         } else {
1588                 LASSERT (peer->ibp_accepting != 0);
1589                 peer->ibp_accepting--;
1590         }
1591
1592         if (peer->ibp_connecting != 0 ||
1593             peer->ibp_accepting != 0) {
1594                 /* another connection attempt under way... */
1595                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1596                 return;
1597         }
1598
1599         if (list_empty(&peer->ibp_conns)) {
1600                 /* Say when active connection can be re-attempted */
1601                 peer->ibp_reconnect_interval *= 2;
1602                 peer->ibp_reconnect_interval =
1603                         MAX(peer->ibp_reconnect_interval,
1604                             *kibnal_tunables.kib_min_reconnect_interval);
1605                 peer->ibp_reconnect_interval =
1606                         MIN(peer->ibp_reconnect_interval,
1607                             *kibnal_tunables.kib_max_reconnect_interval);
1608                 
1609                 peer->ibp_reconnect_time = jiffies + 
1610                                            peer->ibp_reconnect_interval * HZ;
1611         
1612                 /* Take peer's blocked transmits; I'll complete
1613                  * them with error */
1614                 list_add(&zombies, &peer->ibp_tx_queue);
1615                 list_del_init(&peer->ibp_tx_queue);
1616                 
1617                 if (kibnal_peer_active(peer) &&
1618                     (peer->ibp_persistence == 0)) {
1619                         /* failed connection attempt on non-persistent peer */
1620                         kibnal_unlink_peer_locked (peer);
1621                 }
1622
1623                 peer->ibp_error = error;
1624         } else {
1625                 /* Can't have blocked transmits if there are connections */
1626                 LASSERT (list_empty(&peer->ibp_tx_queue));
1627         }
1628         
1629         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1630
1631         kibnal_peer_notify(peer);
1632         
1633         if (!list_empty (&zombies))
1634                 CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
1635                         libcfs_nid2str(peer->ibp_nid));
1636
1637         kibnal_txlist_done(&zombies, -EHOSTUNREACH);
1638 }
1639
1640 void
1641 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1642 {
1643         int               state = conn->ibc_state;
1644         kib_peer_t       *peer = conn->ibc_peer;
1645         kib_tx_t         *tx;
1646         unsigned long     flags;
1647         int               rc;
1648         int               i;
1649
1650         if (conn->ibc_connreq != NULL) {
1651                 LIBCFS_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1652                 conn->ibc_connreq = NULL;
1653         }
1654
1655         switch (state) {
1656         case IBNAL_CONN_CONNECTING:
1657                 /* conn has a CM comm_id */
1658                 if (status == 0) {
1659                         /* Install common (active/passive) callback for
1660                          * disconnect/idle notification */
1661                         rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1662                                                   kibnal_conn_callback,
1663                                                   conn);
1664                         LASSERT (rc == 0);
1665                 } else {
1666                         /* LASSERT (no more CM callbacks) */
1667                         rc = tsIbCmCallbackModify(conn->ibc_comm_id,
1668                                                   kibnal_bad_conn_callback,
1669                                                   conn);
1670                         LASSERT (rc == 0);
1671                 }
1672                 break;
1673                 
1674         case IBNAL_CONN_INIT_QP:
1675                 LASSERT (status != 0);
1676                 break;
1677                 
1678         default:
1679                 LBUG();
1680         }
1681         
1682         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1683
1684         if (active)
1685                 LASSERT (peer->ibp_connecting != 0);
1686         else
1687                 LASSERT (peer->ibp_accepting != 0);
1688         
1689         if (status == 0 &&                      /* connection established */
1690             kibnal_peer_active(peer)) {         /* peer not deleted */
1691
1692                 if (active)
1693                         peer->ibp_connecting--;
1694                 else
1695                         peer->ibp_accepting--;
1696
1697                 conn->ibc_last_send = jiffies;
1698                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1699                 kibnal_peer_alive(peer);
1700
1701                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1702                  * the IB_CM_IDLE callback */
1703                 kibnal_conn_addref(conn);
1704                 list_add (&conn->ibc_list, &peer->ibp_conns);
1705
1706                 peer->ibp_reconnect_interval = 0; /* OK to reconnect at any time */
1707
1708                 /* post blocked sends to the new connection */
1709                 spin_lock (&conn->ibc_lock);
1710                 
1711                 while (!list_empty (&peer->ibp_tx_queue)) {
1712                         tx = list_entry (peer->ibp_tx_queue.next, 
1713                                          kib_tx_t, tx_list);
1714                         
1715                         list_del (&tx->tx_list);
1716
1717                         kibnal_queue_tx_locked (tx, conn);
1718                 }
1719                 
1720                 spin_unlock (&conn->ibc_lock);
1721
1722                 /* Nuke any dangling conns from a different peer instance... */
1723                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1724                                                  conn->ibc_incarnation);
1725
1726                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1727
1728                 /* queue up all the receives */
1729                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1730                         /* +1 ref for rx desc */
1731                         kibnal_conn_addref(conn);
1732
1733                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1734                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1735                                conn->ibc_rxs[i].rx_vaddr);
1736
1737                         kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
1738                 }
1739
1740                 kibnal_check_sends (conn);
1741                 return;
1742         }
1743
1744         if (status == 0) {
1745                 /* connection established, but peer was deleted.  Schedule for
1746                  * reaper to cm_disconnect... */
1747                 status = -ECONNABORTED;
1748                 kibnal_close_conn_locked (conn, status);
1749         } else {
1750                 /* just waiting for refs to drain */
1751                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1752         } 
1753
1754         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1755
1756         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1757 }
1758
1759 int
1760 kibnal_accept_connreq (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1761                        kib_msg_t *msg, int nob)
1762 {
1763         kib_conn_t    *conn;
1764         kib_peer_t    *peer;
1765         kib_peer_t    *peer2;
1766         unsigned long  flags;
1767         int            rc;
1768
1769         rc = kibnal_unpack_msg(msg, 0, nob);
1770         if (rc != 0) {
1771                 CERROR("Can't unpack connreq msg: %d\n", rc);
1772                 return -EPROTO;
1773         }
1774
1775         CDEBUG(D_NET, "connreq from %s\n", libcfs_nid2str(msg->ibm_srcnid));
1776
1777         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1778                 CERROR("Unexpected connreq msg type: %x from %s\n",
1779                        msg->ibm_type, libcfs_nid2str(msg->ibm_srcnid));
1780                 return -EPROTO;
1781         }
1782                 
1783         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1784                 CERROR("Can't accept %s: bad queue depth %d (%d expected)\n",
1785                        libcfs_nid2str(msg->ibm_srcnid), 
1786                        msg->ibm_u.connparams.ibcp_queue_depth, 
1787                        IBNAL_MSG_QUEUE_SIZE);
1788                 return (-EPROTO);
1789         }
1790         
1791         conn = kibnal_create_conn();
1792         if (conn == NULL)
1793                 return (-ENOMEM);
1794
1795         /* assume 'nid' is a new peer */
1796         rc = kibnal_create_peer(&peer, msg->ibm_srcnid);
1797         if (rc != 0) {
1798                 kibnal_conn_decref(conn);
1799                 return (-ENOMEM);
1800         }
1801         
1802         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1803
1804         if (kibnal_data.kib_nonewpeers) {
1805                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1806                 
1807                 CERROR ("Shutdown has started, drop connreq from %s\n",
1808                         libcfs_nid2str(msg->ibm_srcnid));
1809                 kibnal_conn_decref(conn);
1810                 kibnal_peer_decref(peer);
1811                 return -ESHUTDOWN;
1812         }
1813
1814         /* Check I'm the same instance that gave the connection parameters.  
1815          * NB If my incarnation changes after this, the peer will get nuked and
1816          * we'll spot that when the connection is finally added into the peer's
1817          * connlist */
1818         if (kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid ||
1819             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1820                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1821                 
1822                 CERROR("Stale connection params from %s\n",
1823                        libcfs_nid2str(msg->ibm_srcnid));
1824                 kibnal_conn_decref(conn);
1825                 kibnal_peer_decref(peer);
1826                 return -ESTALE;
1827         }
1828
1829         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1830         if (peer2 == NULL) {
1831                 /* Brand new peer */
1832                 LASSERT (peer->ibp_accepting == 0);
1833
1834                 /* peer table takes my ref on peer */
1835                 list_add_tail (&peer->ibp_list,
1836                                kibnal_nid2peerlist(msg->ibm_srcnid));
1837         } else {
1838                 /* tie-break connection race in favour of the higher NID */                
1839                 if (peer2->ibp_connecting != 0 &&
1840                     msg->ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
1841                         write_unlock_irqrestore(&kibnal_data.kib_global_lock,
1842                                                 flags);
1843                         CWARN("Conn race %s\n",
1844                               libcfs_nid2str(peer2->ibp_nid));
1845
1846                         kibnal_conn_decref(conn);
1847                         kibnal_peer_decref(peer);
1848                         return -EALREADY;
1849                 }
1850
1851                 kibnal_peer_decref(peer);
1852                 peer = peer2;
1853         }
1854
1855         /* +1 ref for conn */
1856         kibnal_peer_addref(peer);
1857         peer->ibp_accepting++;
1858
1859         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1860
1861         conn->ibc_peer = peer;
1862         conn->ibc_state = IBNAL_CONN_CONNECTING;
1863         conn->ibc_comm_id = cid;
1864         conn->ibc_incarnation = msg->ibm_srcstamp;
1865         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1866         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
1867         conn->ibc_version = msg->ibm_version;
1868
1869         *connp = conn;
1870         return (0);
1871 }
1872
1873 tTS_IB_CM_CALLBACK_RETURN
1874 kibnal_bad_conn_callback (tTS_IB_CM_EVENT event,
1875                           tTS_IB_CM_COMM_ID cid,
1876                           void *param,
1877                           void *arg)
1878 {
1879         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1880         LBUG ();
1881         return TS_IB_CM_CALLBACK_PROCEED;
1882 }
1883
1884 void
1885 kibnal_abort_txs (kib_conn_t *conn, struct list_head *txs)
1886 {
1887         LIST_HEAD        (zombies); 
1888         struct list_head *tmp;
1889         struct list_head *nxt;
1890         kib_tx_t         *tx;
1891         unsigned long     flags;
1892
1893         spin_lock_irqsave (&conn->ibc_lock, flags);
1894
1895         list_for_each_safe (tmp, nxt, txs) {
1896                 tx = list_entry (tmp, kib_tx_t, tx_list);
1897
1898                 if (txs == &conn->ibc_active_txs) {
1899                         LASSERT (tx->tx_passive_rdma ||
1900                                  !tx->tx_passive_rdma_wait);
1901
1902                         LASSERT (tx->tx_passive_rdma_wait ||
1903                                  tx->tx_sending != 0);
1904                 } else {
1905                         LASSERT (!tx->tx_passive_rdma_wait);
1906                         LASSERT (tx->tx_sending == 0);
1907                 }
1908
1909                 tx->tx_status = -ECONNABORTED;
1910                 tx->tx_passive_rdma_wait = 0;
1911
1912                 if (tx->tx_sending == 0) {
1913                         list_del (&tx->tx_list);
1914                         list_add (&tx->tx_list, &zombies);
1915                 }
1916         }
1917         
1918         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1919
1920         kibnal_txlist_done (&zombies, -ECONNABORTED);
1921 }
1922
1923 tTS_IB_CM_CALLBACK_RETURN
1924 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1925                       tTS_IB_CM_COMM_ID cid,
1926                       void *param,
1927                       void *arg)
1928 {
1929         kib_conn_t       *conn = arg;
1930         int               rc;
1931
1932         /* Established Connection Notifier */
1933
1934         switch (event) {
1935         default:
1936                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
1937                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1938                 kibnal_close_conn (conn, -ECONNABORTED);
1939                 break;
1940                 
1941         case TS_IB_CM_DISCONNECTED:
1942                 CDEBUG(D_NETERROR, "Connection %p -> %s DISCONNECTED.\n",
1943                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1944                 kibnal_close_conn (conn, 0);
1945                 break;
1946
1947         case TS_IB_CM_IDLE:
1948                 CDEBUG(D_NET, "Connection %p -> %s IDLE.\n",
1949                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1950
1951                 /* LASSERT (no further callbacks) */
1952                 rc = tsIbCmCallbackModify(cid, kibnal_bad_conn_callback, conn);
1953                 LASSERT (rc == 0);
1954
1955                 /* NB we wait until the connection has closed before
1956                  * completing outstanding passive RDMAs so we can be sure
1957                  * the network can't touch the mapped memory any more. */
1958
1959                 kibnal_abort_txs(conn, &conn->ibc_tx_queue);
1960                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
1961                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
1962                 kibnal_abort_txs(conn, &conn->ibc_active_txs);
1963                 
1964                 kibnal_conn_decref(conn);        /* Lose CM's ref */
1965                 break;
1966         }
1967
1968         return TS_IB_CM_CALLBACK_PROCEED;
1969 }
1970
1971 tTS_IB_CM_CALLBACK_RETURN
1972 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1973                               tTS_IB_CM_COMM_ID cid,
1974                               void *param,
1975                               void *arg)
1976 {
1977         kib_conn_t  *conn = arg;
1978         int          rc;
1979         
1980         switch (event) {
1981         default:
1982                 if (conn == NULL) {
1983                         /* no connection yet */
1984                         CERROR ("Unexpected event: %d\n", event);
1985                         return TS_IB_CM_CALLBACK_ABORT;
1986                 }
1987                 
1988                 CERROR ("%s event %p -> %s: %d\n",
1989                         (event == TS_IB_CM_IDLE) ? "IDLE" : "Unexpected",
1990                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1991                 kibnal_connreq_done(conn, 0, -ECONNABORTED);
1992                 kibnal_conn_decref(conn); /* drop CM's ref */
1993                 return TS_IB_CM_CALLBACK_ABORT;
1994                 
1995         case TS_IB_CM_REQ_RECEIVED: {
1996                 struct ib_cm_req_received_param *req = param;
1997                 kib_msg_t                       *msg = req->remote_private_data;
1998
1999                 LASSERT (conn == NULL);
2000
2001                 /* Don't really know srcnid until successful unpack */
2002                 CDEBUG(D_NET, "REQ from ?%s?\n", libcfs_nid2str(msg->ibm_srcnid));
2003
2004                 rc = kibnal_accept_connreq(&conn, cid, msg, 
2005                                            req->remote_private_data_len);
2006                 if (rc != 0) {
2007                         CERROR ("Can't accept ?%s?: %d\n",
2008                                 libcfs_nid2str(msg->ibm_srcnid), rc);
2009                         return TS_IB_CM_CALLBACK_ABORT;
2010                 }
2011
2012                 /* update 'arg' for next callback */
2013                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
2014                 LASSERT (rc == 0);
2015
2016                 msg = req->accept_param.reply_private_data;
2017                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
2018                                 sizeof(msg->ibm_u.connparams));
2019
2020                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2021
2022                 kibnal_pack_msg(msg, conn->ibc_version, 0, 
2023                                 conn->ibc_peer->ibp_nid, 
2024                                 conn->ibc_incarnation);
2025
2026                 req->accept_param.qp                     = conn->ibc_qp;
2027                 req->accept_param.reply_private_data_len = msg->ibm_nob;
2028                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
2029                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
2030                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
2031                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
2032
2033                 CDEBUG(D_NET, "Proceeding\n");
2034                 return TS_IB_CM_CALLBACK_PROCEED; /* CM takes my ref on conn */
2035         }
2036
2037         case TS_IB_CM_ESTABLISHED:
2038                 LASSERT (conn != NULL);
2039                 CWARN("Connection %p -> %s ESTABLISHED.\n",
2040                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2041
2042                 kibnal_connreq_done(conn, 0, 0);
2043                 return TS_IB_CM_CALLBACK_PROCEED;
2044         }
2045 }
2046
2047 tTS_IB_CM_CALLBACK_RETURN
2048 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
2049                              tTS_IB_CM_COMM_ID cid,
2050                              void *param,
2051                              void *arg)
2052 {
2053         kib_conn_t    *conn = arg;
2054         unsigned long  flags;
2055
2056         switch (event) {
2057         case TS_IB_CM_REP_RECEIVED: {
2058                 struct ib_cm_rep_received_param *rep = param;
2059                 kib_msg_t                       *msg = rep->remote_private_data;
2060                 int                              nob = rep->remote_private_data_len;
2061                 int                              rc;
2062
2063                 rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
2064                 if (rc != 0) {
2065                         CERROR ("Error %d unpacking conn ack from %s\n",
2066                                 rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2067                         kibnal_connreq_done(conn, 1, rc);
2068                         kibnal_conn_decref(conn); /* drop CM's ref */
2069                         return TS_IB_CM_CALLBACK_ABORT;
2070                 }
2071
2072                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2073                         CERROR ("Unexpected conn ack type %d from %s\n",
2074                                 msg->ibm_type, 
2075                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2076                         kibnal_connreq_done(conn, 1, -EPROTO);
2077                         kibnal_conn_decref(conn); /* drop CM's ref */
2078                         return TS_IB_CM_CALLBACK_ABORT;
2079                 }
2080
2081                 if (conn->ibc_peer->ibp_nid != msg->ibm_srcnid ||
2082                     kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid ||
2083                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2084                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2085                         CERROR("Stale conn ack from %s\n",
2086                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2087                         kibnal_connreq_done(conn, 1, -ESTALE);
2088                         kibnal_conn_decref(conn); /* drop CM's ref */
2089                         return TS_IB_CM_CALLBACK_ABORT;
2090                 }
2091
2092                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2093                         CERROR ("Bad queue depth %d from %s\n",
2094                                 msg->ibm_u.connparams.ibcp_queue_depth,
2095                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2096                         kibnal_connreq_done(conn, 1, -EPROTO);
2097                         kibnal_conn_decref(conn); /* drop CM's ref */
2098                         return TS_IB_CM_CALLBACK_ABORT;
2099                 }
2100                                 
2101                 CDEBUG(D_NET, "Connection %p -> %s REP_RECEIVED.\n",
2102                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2103
2104                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2105                 conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2106                 return TS_IB_CM_CALLBACK_PROCEED;
2107         }
2108
2109         case TS_IB_CM_ESTABLISHED:
2110                 CWARN("Connection %p -> %s ESTABLISHED\n",
2111                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2112
2113                 kibnal_connreq_done(conn, 1, 0);
2114                 return TS_IB_CM_CALLBACK_PROCEED;
2115
2116         case TS_IB_CM_IDLE:
2117                 CDEBUG(D_NETERROR, "Connection %p -> %s IDLE\n",
2118                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2119                 /* I assume this connection attempt was rejected because the
2120                  * peer found a stale QP; I'll just try again */
2121                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2122                 kibnal_schedule_active_connect_locked(conn->ibc_peer);
2123                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2124
2125                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2126                 kibnal_conn_decref(conn); /* drop CM's ref */
2127                 return TS_IB_CM_CALLBACK_ABORT;
2128
2129         default:
2130                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
2131                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
2132                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2133                 kibnal_conn_decref(conn); /* drop CM's ref */
2134                 return TS_IB_CM_CALLBACK_ABORT;
2135         }
2136 }
2137
2138 int
2139 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2140                           struct ib_path_record *resp, int remaining,
2141                           void *arg)
2142 {
2143         kib_conn_t *conn = arg;
2144         kib_peer_t *peer = conn->ibc_peer;
2145         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2146
2147         if (status != 0) {
2148                 CDEBUG (D_NETERROR, "Pathreq %p -> %s failed: %d\n",
2149                         conn, libcfs_nid2str(peer->ibp_nid), status);
2150                 kibnal_connreq_done(conn, 1, status);
2151                 kibnal_conn_decref(conn); /* drop callback's ref */
2152                 return 1;    /* non-zero prevents further callbacks */
2153         }
2154
2155         conn->ibc_connreq->cr_path = *resp;
2156
2157         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2158         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2159         kibnal_pack_msg(msg, conn->ibc_version, 0, 
2160                         peer->ibp_nid, conn->ibc_incarnation);
2161
2162         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2163                 .qp                   = conn->ibc_qp,
2164                 .req_private_data     = msg,
2165                 .req_private_data_len = msg->ibm_nob,
2166                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2167                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2168                 .retry_count          = IBNAL_RETRY,
2169                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2170                 .cm_response_timeout  = *kibnal_tunables.kib_timeout,
2171                 .max_cm_retries       = IBNAL_CM_RETRY,
2172                 .flow_control         = IBNAL_FLOW_CONTROL,
2173         };
2174
2175         /* XXX set timeout just like SDP!!!*/
2176         conn->ibc_connreq->cr_path.packet_life = 13;
2177         
2178         /* Flag I'm getting involved with the CM... */
2179         conn->ibc_state = IBNAL_CONN_CONNECTING;
2180
2181         CDEBUG(D_NET, "Connecting to, service id "LPX64", on %s\n",
2182                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 
2183                libcfs_nid2str(peer->ibp_nid));
2184
2185         /* kibnal_connect_callback gets my conn ref */
2186         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2187                                 &conn->ibc_connreq->cr_path, NULL,
2188                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2189                                 kibnal_active_conn_callback, conn,
2190                                 &conn->ibc_comm_id);
2191         if (status != 0) {
2192                 CERROR ("Connect %p -> %s failed: %d\n",
2193                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
2194                 /* Back out state change: I've not got a CM comm_id yet... */
2195                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2196                 kibnal_connreq_done(conn, 1, status);
2197                 kibnal_conn_decref(conn); /* Drop callback's ref */
2198         }
2199         
2200         return 1;    /* non-zero to prevent further callbacks */
2201 }
2202
2203 void
2204 kibnal_connect_peer (kib_peer_t *peer)
2205 {
2206         kib_conn_t  *conn;
2207         int          rc;
2208
2209         conn = kibnal_create_conn();
2210         if (conn == NULL) {
2211                 CERROR ("Can't allocate conn\n");
2212                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2213                 return;
2214         }
2215
2216         conn->ibc_peer = peer;
2217         kibnal_peer_addref(peer);
2218
2219         LIBCFS_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2220         if (conn->ibc_connreq == NULL) {
2221                 CERROR ("Can't allocate connreq\n");
2222                 kibnal_connreq_done(conn, 1, -ENOMEM);
2223                 kibnal_conn_decref(conn); /* drop my ref */
2224                 return;
2225         }
2226
2227         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2228
2229         rc = kibnal_make_svcqry(conn);
2230         if (rc != 0) {
2231                 kibnal_connreq_done (conn, 1, rc);
2232                 kibnal_conn_decref(conn); /* drop my ref */
2233                 return;
2234         }
2235
2236         rc = ib_cached_gid_get(kibnal_data.kib_device,
2237                                kibnal_data.kib_port, 0,
2238                                conn->ibc_connreq->cr_gid);
2239         LASSERT (rc == 0);
2240
2241         /* kibnal_pathreq_callback gets my conn ref */
2242         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2243                                     kibnal_data.kib_port,
2244                                     conn->ibc_connreq->cr_gid,
2245                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2246                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2247                                     0,
2248                                     *kibnal_tunables.kib_timeout * HZ,
2249                                     0,
2250                                     kibnal_pathreq_callback, conn, 
2251                                     &conn->ibc_connreq->cr_tid);
2252         if (rc == 0)
2253                 return; /* callback now has my ref on conn */
2254
2255         CERROR ("Path record request %p -> %s failed: %d\n",
2256                 conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
2257         kibnal_connreq_done(conn, 1, rc);
2258         kibnal_conn_decref(conn); /* drop my ref */
2259 }
2260
2261 int
2262 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
2263 {
2264         kib_tx_t          *tx;
2265         struct list_head  *ttmp;
2266         unsigned long      flags;
2267         int                timed_out = 0;
2268
2269         spin_lock_irqsave (&conn->ibc_lock, flags);
2270
2271         list_for_each (ttmp, txs) {
2272                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2273
2274                 if (txs == &conn->ibc_active_txs) {
2275                         LASSERT (tx->tx_passive_rdma ||
2276                                  !tx->tx_passive_rdma_wait);
2277
2278                         LASSERT (tx->tx_passive_rdma_wait ||
2279                                  tx->tx_sending != 0);
2280                 } else {
2281                         LASSERT (!tx->tx_passive_rdma_wait);
2282                         LASSERT (tx->tx_sending == 0);
2283                 }
2284                 
2285                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2286                         timed_out = 1;
2287                         break;
2288                 }
2289         }
2290
2291         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2292         return timed_out;
2293 }
2294
2295 int
2296 kibnal_conn_timed_out (kib_conn_t *conn)
2297 {
2298         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
2299                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
2300                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
2301                 kibnal_check_txs(conn, &conn->ibc_active_txs);
2302 }
2303
2304 void
2305 kibnal_check_conns (int idx)
2306 {
2307         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2308         struct list_head  *ptmp;
2309         kib_peer_t        *peer;
2310         kib_conn_t        *conn;
2311         struct list_head  *ctmp;
2312         unsigned long      flags;
2313
2314  again:
2315         /* NB. We expect to have a look at all the peers and not find any
2316          * rdmas to time out, so we just use a shared lock while we
2317          * take a look... */
2318         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2319
2320         list_for_each (ptmp, peers) {
2321                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2322
2323                 list_for_each (ctmp, &peer->ibp_conns) {
2324                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2325
2326                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2327
2328
2329                         /* In case we have enough credits to return via a
2330                          * NOOP, but there were no non-blocking tx descs
2331                          * free to do it last time... */
2332                         kibnal_check_sends(conn);
2333
2334                         if (!kibnal_conn_timed_out(conn))
2335                                 continue;
2336                         
2337                         kibnal_conn_addref(conn);
2338
2339                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2340                                                flags);
2341
2342                         CERROR("Timed out RDMA with %s\n",
2343                                libcfs_nid2str(peer->ibp_nid));
2344
2345                         kibnal_close_conn (conn, -ETIMEDOUT);
2346                         kibnal_conn_decref(conn);
2347
2348                         /* start again now I've dropped the lock */
2349                         goto again;
2350                 }
2351         }
2352
2353         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2354 }
2355
2356 void
2357 kibnal_terminate_conn (kib_conn_t *conn)
2358 {
2359         int           rc;
2360
2361         CDEBUG(D_NET, "conn %p\n", conn);
2362         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2363         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2364
2365         rc = ib_cm_disconnect (conn->ibc_comm_id);
2366         if (rc != 0)
2367                 CERROR ("Error %d disconnecting conn %p -> %s\n",
2368                         rc, conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2369
2370         kibnal_peer_notify(conn->ibc_peer);
2371 }
2372
2373 int
2374 kibnal_reaper (void *arg)
2375 {
2376         wait_queue_t       wait;
2377         unsigned long      flags;
2378         kib_conn_t        *conn;
2379         int                timeout;
2380         int                i;
2381         int                peer_index = 0;
2382         unsigned long      deadline = jiffies;
2383         
2384         cfs_daemonize ("kibnal_reaper");
2385         cfs_block_allsigs ();
2386
2387         init_waitqueue_entry (&wait, current);
2388
2389         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2390
2391         while (!kibnal_data.kib_shutdown) {
2392                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2393                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2394                                            kib_conn_t, ibc_list);
2395                         list_del (&conn->ibc_list);
2396                         
2397                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2398
2399                         switch (conn->ibc_state) {
2400                         case IBNAL_CONN_DEATHROW:
2401                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2402                                 /* Disconnect: conn becomes a zombie in the
2403                                  * callback and last ref reschedules it
2404                                  * here... */
2405                                 kibnal_terminate_conn(conn);
2406                                 kibnal_conn_decref(conn);
2407                                 break;
2408
2409                         case IBNAL_CONN_INIT_QP:
2410                         case IBNAL_CONN_ZOMBIE:
2411                                 kibnal_destroy_conn (conn);
2412                                 break;
2413                                 
2414                         default:
2415                                 CERROR ("Bad conn %p state: %d\n",
2416                                         conn, conn->ibc_state);
2417                                 LBUG();
2418                         }
2419
2420                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2421                         continue;
2422                 }
2423
2424                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2425
2426                 /* careful with the jiffy wrap... */
2427                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2428                         const int n = 4;
2429                         const int p = 1;
2430                         int       chunk = kibnal_data.kib_peer_hash_size;
2431                         
2432                         /* Time to check for RDMA timeouts on a few more
2433                          * peers: I do checks every 'p' seconds on a
2434                          * proportion of the peer table and I need to check
2435                          * every connection 'n' times within a timeout
2436                          * interval, to ensure I detect a timeout on any
2437                          * connection within (n+1)/n times the timeout
2438                          * interval. */
2439
2440                         if (*kibnal_tunables.kib_timeout > n * p)
2441                                 chunk = (chunk * n * p) / 
2442                                         *kibnal_tunables.kib_timeout;
2443                         if (chunk == 0)
2444                                 chunk = 1;
2445
2446                         for (i = 0; i < chunk; i++) {
2447                                 kibnal_check_conns (peer_index);
2448                                 peer_index = (peer_index + 1) % 
2449                                              kibnal_data.kib_peer_hash_size;
2450                         }
2451
2452                         deadline += p * HZ;
2453                 }
2454
2455                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2456
2457                 set_current_state (TASK_INTERRUPTIBLE);
2458                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2459
2460                 schedule_timeout (timeout);
2461
2462                 set_current_state (TASK_RUNNING);
2463                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2464
2465                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2466         }
2467
2468         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2469
2470         kibnal_thread_fini ();
2471         return (0);
2472 }
2473
2474 int
2475 kibnal_connd (void *arg)
2476 {
2477         long               id = (long)arg;
2478         char               name[16];
2479         wait_queue_t       wait;
2480         unsigned long      flags;
2481         kib_peer_t        *peer;
2482         kib_acceptsock_t  *as;
2483         int                did_something;
2484
2485         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2486         cfs_daemonize(name);
2487         cfs_block_allsigs();
2488
2489         init_waitqueue_entry (&wait, current);
2490
2491         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2492
2493         while (!kibnal_data.kib_shutdown) {
2494                 did_something = 0;
2495
2496                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2497                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2498                                          kib_acceptsock_t, ibas_list);
2499                         list_del (&as->ibas_list);
2500                         
2501                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2502
2503                         kibnal_handle_svcqry(as->ibas_sock);
2504                         kibnal_free_acceptsock(as);
2505                         
2506                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2507                         did_something = 1;
2508                 }
2509                         
2510                 /* Only handle an outgoing connection request if there is someone left
2511                  * to handle an incoming svcqry */
2512                 if (!list_empty (&kibnal_data.kib_connd_peers) &&
2513                     ((kibnal_data.kib_connd_connecting + 1) < 
2514                      *kibnal_tunables.kib_n_connd)) {
2515                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2516                                            kib_peer_t, ibp_connd_list);
2517                         
2518                         list_del_init (&peer->ibp_connd_list);
2519                         kibnal_data.kib_connd_connecting++;
2520                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2521
2522                         kibnal_connect_peer (peer);
2523                         kibnal_peer_decref(peer);
2524
2525                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2526                         did_something = 1;
2527                         kibnal_data.kib_connd_connecting--;
2528                 }
2529
2530                 if (did_something)
2531                         continue;
2532
2533                 set_current_state (TASK_INTERRUPTIBLE);
2534                 add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
2535
2536                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2537
2538                 schedule();
2539
2540                 set_current_state (TASK_RUNNING);
2541                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2542
2543                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2544         }
2545
2546         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2547
2548         kibnal_thread_fini ();
2549         return (0);
2550 }
2551
2552 int
2553 kibnal_scheduler(void *arg)
2554 {
2555         long            id = (long)arg;
2556         char            name[16];
2557         kib_rx_t       *rx;
2558         kib_tx_t       *tx;
2559         unsigned long   flags;
2560         int             rc;
2561         int             counter = 0;
2562         int             did_something;
2563
2564         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2565         cfs_daemonize(name);
2566         cfs_block_allsigs();
2567
2568         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2569
2570         while (!kibnal_data.kib_shutdown) {
2571                 did_something = 0;
2572
2573                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2574                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2575                                         kib_tx_t, tx_list);
2576                         list_del(&tx->tx_list);
2577                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2578                                                flags);
2579                         kibnal_tx_done(tx);
2580
2581                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2582                                           flags);
2583                 }
2584
2585                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2586                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2587                                         kib_rx_t, rx_list);
2588                         list_del(&rx->rx_list);
2589                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2590                                                flags);
2591
2592                         kibnal_rx(rx);
2593
2594                         did_something = 1;
2595                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2596                                           flags);
2597                 }
2598
2599                 /* nothing to do or hogging CPU */
2600                 if (!did_something || counter++ == IBNAL_RESCHED) {
2601                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2602                                                flags);
2603                         counter = 0;
2604
2605                         if (!did_something) {
2606                                 rc = wait_event_interruptible_exclusive(
2607                                         kibnal_data.kib_sched_waitq,
2608                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2609                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2610                                         kibnal_data.kib_shutdown);
2611                         } else {
2612                                 our_cond_resched();
2613                         }
2614
2615                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2616                                           flags);
2617                 }
2618         }
2619
2620         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2621
2622         kibnal_thread_fini();
2623         return (0);
2624 }