Whamcloud - gitweb
a5ac4692fedbf7a063aefd53ac6106c328e74005
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openiblnd.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         lnet_msg_t      *lntmsg[2];
47         unsigned long    flags;
48         int              i;
49         int              rc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         if (in_interrupt()) {
55                 /* can't deregister memory/flush FMAs/finalize in IRQ context... */
56                 kibnal_schedule_tx_done(tx);
57                 return;
58         }
59
60         switch (tx->tx_mapped) {
61         default:
62                 LBUG();
63
64         case KIB_TX_UNMAPPED:
65                 break;
66                 
67         case KIB_TX_MAPPED:
68                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
69                 LASSERT (rc == 0);
70                 tx->tx_mapped = KIB_TX_UNMAPPED;
71                 break;
72
73 #if IBNAL_FMR
74         case KIB_TX_MAPPED_FMR:
75                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
76                 LASSERT (rc == 0);
77
78 #ifndef USING_TSAPI
79                 /* Somewhat belt-and-braces since the tx's conn has closed if
80                  * this was a passive RDMA waiting to complete... */
81                 if (tx->tx_status != 0)
82                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
83 #endif
84                 tx->tx_mapped = KIB_TX_UNMAPPED;
85                 break;
86 #endif
87         }
88
89         /* tx may have up to 2 ptlmsgs to finalise */
90         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
91         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
92         rc = tx->tx_status;
93
94         if (tx->tx_conn != NULL) {
95                 kibnal_conn_decref(tx->tx_conn);
96                 tx->tx_conn = NULL;
97         }
98
99         tx->tx_nsp = 0;
100         tx->tx_passive_rdma = 0;
101         tx->tx_status = 0;
102
103         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
104
105         list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
106
107         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
108
109         /* delay finalize until my descs have been freed */
110         for (i = 0; i < 2; i++) {
111                 if (lntmsg[i] == NULL)
112                         continue;
113
114                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
115         }
116 }
117
118 kib_tx_t *
119 kibnal_get_idle_tx (void) 
120 {
121         unsigned long  flags;
122         kib_tx_t      *tx;
123         
124         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
125
126         if (list_empty (&kibnal_data.kib_idle_txs)) {
127                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
128                 return NULL;
129         }
130
131         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
132         list_del (&tx->tx_list);
133
134         /* Allocate a new passive RDMA completion cookie.  It might not be
135          * needed, but we've got a lock right now and we're unlikely to
136          * wrap... */
137         tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
138
139         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
140
141         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
142         LASSERT (tx->tx_nsp == 0);
143         LASSERT (tx->tx_sending == 0);
144         LASSERT (tx->tx_status == 0);
145         LASSERT (tx->tx_conn == NULL);
146         LASSERT (!tx->tx_passive_rdma);
147         LASSERT (!tx->tx_passive_rdma_wait);
148         LASSERT (tx->tx_lntmsg[0] == NULL);
149         LASSERT (tx->tx_lntmsg[1] == NULL);
150
151         return tx;
152 }
153
154 void
155 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
156 {
157         struct list_head *ttmp;
158         unsigned long     flags;
159         int               idle;
160
161         spin_lock_irqsave (&conn->ibc_lock, flags);
162
163         list_for_each (ttmp, &conn->ibc_active_txs) {
164                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
165
166                 LASSERT (tx->tx_passive_rdma ||
167                          !tx->tx_passive_rdma_wait);
168
169                 LASSERT (tx->tx_passive_rdma_wait ||
170                          tx->tx_sending != 0);
171
172                 if (!tx->tx_passive_rdma_wait ||
173                     tx->tx_passive_rdma_cookie != cookie)
174                         continue;
175
176                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
177
178                 /* XXX Set mlength of reply here */
179
180                 tx->tx_status = status;
181                 tx->tx_passive_rdma_wait = 0;
182                 idle = (tx->tx_sending == 0);
183
184                 if (idle)
185                         list_del (&tx->tx_list);
186
187                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
188
189                 /* I could be racing with tx callbacks.  It's whoever
190                  * _makes_ tx idle that frees it */
191                 if (idle)
192                         kibnal_tx_done (tx);
193                 return;
194         }
195                 
196         spin_unlock_irqrestore (&conn->ibc_lock, flags);
197
198         CERROR ("Unmatched (late?) RDMA completion "LPX64" from %s\n",
199                 cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
200 }
201
202 void
203 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
204 {
205         kib_conn_t   *conn = rx->rx_conn;
206         int           rc;
207         unsigned long flags;
208
209         LASSERT(!rsrvd_credit ||
210                 conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
211
212         rx->rx_gl = (struct ib_gather_scatter) {
213                 .address = rx->rx_vaddr,
214                 .length  = IBNAL_MSG_SIZE,
215                 .key     = conn->ibc_rx_pages->ibp_lkey,
216         };
217
218         rx->rx_sp = (struct ib_receive_param) {
219                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
220                 .scatter_list           = &rx->rx_gl,
221                 .num_scatter_entries    = 1,
222                 .device_specific        = NULL,
223                 .signaled               = 1,
224         };
225
226         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
227         LASSERT (rx->rx_nob >= 0);              /* not posted */
228         rx->rx_nob = -1;                        /* is now */
229         mb();
230
231         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
232                 rc = -ECONNABORTED;
233         else
234                 rc = kibnal_ib_receive(conn->ibc_qp, &rx->rx_sp);
235
236         if (rc == 0) {
237                 if (credit || rsrvd_credit) {
238                         spin_lock_irqsave(&conn->ibc_lock, flags);
239
240                         if (credit)
241                                 conn->ibc_outstanding_credits++;
242                         if (rsrvd_credit)
243                                 conn->ibc_reserved_credits++;
244                         
245                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
246
247                         kibnal_check_sends(conn);
248                 }
249                 return;
250         }
251
252         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
253                 CERROR ("Error posting receive -> %s: %d\n",
254                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
255                 kibnal_close_conn (rx->rx_conn, rc);
256         } else {
257                 CDEBUG (D_NET, "Error posting receive -> %s: %d\n",
258                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
259         }
260
261         /* Drop rx's ref */
262         kibnal_conn_decref(conn);
263 }
264
265 void
266 kibnal_rx_callback (struct ib_cq_entry *e)
267 {
268         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
269         kib_msg_t    *msg = rx->rx_msg;
270         kib_conn_t   *conn = rx->rx_conn;
271         int           credits;
272         unsigned long flags;
273         int           rc;
274         int           err = -ECONNABORTED;
275
276         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
277         LASSERT (rx->rx_nob < 0);               /* was posted */
278         rx->rx_nob = 0;                         /* isn't now */
279         mb();
280
281         /* receives complete with error in any case after we've started
282          * closing the QP */
283         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
284                 goto failed;
285
286         /* We don't post receives until the conn is established */
287         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
288
289         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
290                 CERROR("Rx from %s failed: %d\n", 
291                        libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
292                 goto failed;
293         }
294
295         LASSERT (e->bytes_transferred >= 0);
296         rx->rx_nob = e->bytes_transferred;
297         mb();
298
299         rc = kibnal_unpack_msg(msg, conn->ibc_version, rx->rx_nob);
300         if (rc != 0) {
301                 CERROR ("Error %d unpacking rx from %s\n",
302                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
303                 goto failed;
304         }
305
306         if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
307                                      msg->ibm_srcnid) ||
308             !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
309                                      msg->ibm_dstnid) ||
310             msg->ibm_srcstamp != conn->ibc_incarnation ||
311             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
312                 CERROR ("Stale rx from %s\n",
313                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
314                 err = -ESTALE;
315                 goto failed;
316         }
317
318         /* Have I received credits that will let me send? */
319         credits = msg->ibm_credits;
320         if (credits != 0) {
321                 spin_lock_irqsave(&conn->ibc_lock, flags);
322                 conn->ibc_credits += credits;
323                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
324                 
325                 kibnal_check_sends(conn);
326         }
327
328         switch (msg->ibm_type) {
329         case IBNAL_MSG_NOOP:
330                 kibnal_post_rx (rx, 1, 0);
331                 return;
332
333         case IBNAL_MSG_IMMEDIATE:
334                 break;
335                 
336         case IBNAL_MSG_PUT_RDMA:
337         case IBNAL_MSG_GET_RDMA:
338                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
339                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
340                        msg->ibm_u.rdma.ibrm_desc.rd_key,
341                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
342                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
343                 break;
344                 
345         case IBNAL_MSG_PUT_DONE:
346         case IBNAL_MSG_GET_DONE:
347                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
348                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
349                        msg->ibm_u.completion.ibcm_status);
350
351                 kibnal_complete_passive_rdma (conn, 
352                                               msg->ibm_u.completion.ibcm_cookie,
353                                               msg->ibm_u.completion.ibcm_status);
354
355                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
356                         kibnal_post_rx (rx, 1, 0);
357                 } else {
358                         /* this reply buffer was pre-reserved */
359                         kibnal_post_rx (rx, 0, 1);
360                 }
361                 return;
362                         
363         default:
364                 CERROR ("Bad msg type %x from %s\n",
365                         msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
366                 goto failed;
367         }
368
369         kibnal_peer_alive(conn->ibc_peer);
370
371         /* schedule for kibnal_rx() in thread context */
372         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
373         
374         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
375         wake_up (&kibnal_data.kib_sched_waitq);
376         
377         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
378         return;
379         
380  failed:
381         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
382         kibnal_close_conn(conn, err);
383
384         /* Don't re-post rx & drop its ref on conn */
385         kibnal_conn_decref(conn);
386 }
387
388 void
389 kibnal_rx (kib_rx_t *rx)
390 {
391         int          rc = 0;
392         kib_msg_t   *msg = rx->rx_msg;
393
394         switch (msg->ibm_type) {
395         case IBNAL_MSG_GET_RDMA:
396                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
397                                 msg->ibm_srcnid, rx, 1);
398                 break;
399                 
400         case IBNAL_MSG_PUT_RDMA:
401                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
402                                 msg->ibm_srcnid, rx, 1);
403                 break;
404
405         case IBNAL_MSG_IMMEDIATE:
406                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
407                                 msg->ibm_srcnid, rx, 0);
408                 break;
409
410         default:
411                 LBUG();
412                 break;
413         }
414
415         if (rc < 0) {
416                 kibnal_close_conn(rx->rx_conn, rc);
417                 kibnal_post_rx (rx, 1, 0);
418         }
419 }
420
421 #if 0
422 int
423 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
424 {
425         struct page *page;
426
427         if (vaddr >= VMALLOC_START &&
428             vaddr < VMALLOC_END)
429                 page = vmalloc_to_page ((void *)vaddr);
430 #if CONFIG_HIGHMEM
431         else if (vaddr >= PKMAP_BASE &&
432                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
433                 page = vmalloc_to_page ((void *)vaddr);
434         /* in 2.4 ^ just walks the page tables */
435 #endif
436         else
437                 page = virt_to_page (vaddr);
438
439         if (page == NULL ||
440             !VALID_PAGE (page))
441                 return (-EFAULT);
442
443         *physp = lnet_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
444         return (0);
445 }
446 #endif
447
448 int
449 kibnal_map_iov (kib_tx_t *tx, int access,
450                 unsigned int niov, struct iovec *iov, int offset, int nob)
451                  
452 {
453         void   *vaddr;
454         int     rc;
455
456         LASSERT (nob > 0);
457         LASSERT (niov > 0);
458         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
459
460         while (offset >= iov->iov_len) {
461                 offset -= iov->iov_len;
462                 niov--;
463                 iov++;
464                 LASSERT (niov > 0);
465         }
466
467         if (nob > iov->iov_len - offset) {
468                 CERROR ("Can't map multiple vaddr fragments\n");
469                 return (-EMSGSIZE);
470         }
471
472         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
473         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
474
475         rc = ib_memory_register (kibnal_data.kib_pd,
476                                  vaddr, nob,
477                                  access,
478                                  &tx->tx_md.md_handle.mr,
479                                  &tx->tx_md.md_lkey,
480                                  &tx->tx_md.md_rkey);
481         
482         if (rc != 0) {
483                 CERROR ("Can't map vaddr: %d\n", rc);
484                 return (rc);
485         }
486
487         tx->tx_mapped = KIB_TX_MAPPED;
488         return (0);
489 }
490
491 int
492 kibnal_map_kiov (kib_tx_t *tx, int access,
493                   int nkiov, lnet_kiov_t *kiov,
494                   int offset, int nob)
495 {
496 #if IBNAL_FMR
497         __u64                      *phys;
498         const int                   mapped = KIB_TX_MAPPED_FMR;
499 #else
500         struct ib_physical_buffer  *phys;
501         const int                   mapped = KIB_TX_MAPPED;
502 #endif
503         int                         page_offset;
504         int                         nphys;
505         int                         resid;
506         int                         phys_size;
507         int                         rc;
508
509         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
510
511         LASSERT (nob > 0);
512         LASSERT (nkiov > 0);
513         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
514
515         while (offset >= kiov->kiov_len) {
516                 offset -= kiov->kiov_len;
517                 nkiov--;
518                 kiov++;
519                 LASSERT (nkiov > 0);
520         }
521
522         phys_size = nkiov * sizeof (*phys);
523         LIBCFS_ALLOC(phys, phys_size);
524         if (phys == NULL) {
525                 CERROR ("Can't allocate tmp phys\n");
526                 return (-ENOMEM);
527         }
528
529         page_offset = kiov->kiov_offset + offset;
530 #if IBNAL_FMR
531         phys[0] = lnet_page2phys(kiov->kiov_page);
532 #else
533         phys[0].address = lnet_page2phys(kiov->kiov_page);
534         phys[0].size = PAGE_SIZE;
535 #endif
536         nphys = 1;
537         resid = nob - (kiov->kiov_len - offset);
538
539         while (resid > 0) {
540                 kiov++;
541                 nkiov--;
542                 LASSERT (nkiov > 0);
543
544                 if (kiov->kiov_offset != 0 ||
545                     ((resid > PAGE_SIZE) && 
546                      kiov->kiov_len < PAGE_SIZE)) {
547                         int i;
548                         /* Can't have gaps */
549                         CERROR ("Can't make payload contiguous in I/O VM:"
550                                 "page %d, offset %d, len %d \n", nphys, 
551                                 kiov->kiov_offset, kiov->kiov_len);
552
553                         for (i = -nphys; i < nkiov; i++) 
554                         {
555                                 CERROR("kiov[%d] %p +%d for %d\n",
556                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
557                         }
558                         
559                         rc = -EINVAL;
560                         goto out;
561                 }
562
563                 if (nphys == LNET_MAX_IOV) {
564                         CERROR ("payload too big (%d)\n", nphys);
565                         rc = -EMSGSIZE;
566                         goto out;
567                 }
568
569                 LASSERT (nphys * sizeof (*phys) < phys_size);
570 #if IBNAL_FMR
571                 phys[nphys] = lnet_page2phys(kiov->kiov_page);
572 #else
573                 phys[nphys].address = lnet_page2phys(kiov->kiov_page);
574                 phys[nphys].size = PAGE_SIZE;
575 #endif
576                 nphys++;
577
578                 resid -= PAGE_SIZE;
579         }
580
581         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
582
583 #if IBNAL_FMR
584         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
585                                        phys, nphys,
586                                        &tx->tx_md.md_addr,
587                                        page_offset,
588                                        &tx->tx_md.md_handle.fmr,
589                                        &tx->tx_md.md_lkey,
590                                        &tx->tx_md.md_rkey);
591 #else
592         rc = ib_memory_register_physical (kibnal_data.kib_pd,
593                                           phys, nphys,
594                                           &tx->tx_md.md_addr,
595                                           nob, page_offset,
596                                           access,
597                                           &tx->tx_md.md_handle.mr,
598                                           &tx->tx_md.md_lkey,
599                                           &tx->tx_md.md_rkey);
600 #endif
601         if (rc == 0) {
602                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
603                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
604                 tx->tx_mapped = mapped;
605         } else {
606                 CERROR ("Can't map phys: %d\n", rc);
607                 rc = -EFAULT;
608         }
609
610  out:
611         LIBCFS_FREE(phys, phys_size);
612         return (rc);
613 }
614
615 kib_conn_t *
616 kibnal_find_conn_locked (kib_peer_t *peer)
617 {
618         struct list_head *tmp;
619
620         /* just return the first connection */
621         list_for_each (tmp, &peer->ibp_conns) {
622                 return (list_entry(tmp, kib_conn_t, ibc_list));
623         }
624
625         return (NULL);
626 }
627
628 void
629 kibnal_check_sends (kib_conn_t *conn)
630 {
631         unsigned long   flags;
632         kib_tx_t       *tx;
633         int             rc;
634         int             i;
635         int             consume_credit;
636         int             done;
637         int             nwork;
638
639         spin_lock_irqsave (&conn->ibc_lock, flags);
640
641         LASSERT (conn->ibc_nsends_posted <= IBNAL_RX_MSGS);
642         LASSERT (conn->ibc_reserved_credits >= 0);
643
644         while (conn->ibc_reserved_credits > 0 &&
645                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
646                 LASSERT (conn->ibc_version !=
647                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
648                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
649                                 kib_tx_t, tx_list);
650                 list_del(&tx->tx_list);
651                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
652                 conn->ibc_reserved_credits--;
653         }
654
655         if (list_empty(&conn->ibc_tx_queue) &&
656             list_empty(&conn->ibc_tx_queue_nocred) &&
657             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
658              kibnal_send_keepalive(conn))) {
659                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
660                 
661                 tx = kibnal_get_idle_tx();
662                 if (tx != NULL)
663                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
664
665                 spin_lock_irqsave(&conn->ibc_lock, flags);
666                 
667                 if (tx != NULL)
668                         kibnal_queue_tx_locked(tx, conn);
669         }
670
671         for (;;) {
672                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
673                         LASSERT (conn->ibc_version !=
674                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
675                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
676                                         kib_tx_t, tx_list);
677                         consume_credit = 0;
678                 } else if (!list_empty (&conn->ibc_tx_queue)) {
679                         tx = list_entry (conn->ibc_tx_queue.next, 
680                                          kib_tx_t, tx_list);
681                         consume_credit = 1;
682                 } else {
683                         /* nothing waiting */
684                         break;
685                 }
686
687                 /* We rely on this for QP sizing */
688                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
689
690                 LASSERT (conn->ibc_outstanding_credits >= 0);
691                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
692                 LASSERT (conn->ibc_credits >= 0);
693                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
694
695                 /* Not on ibc_rdma_queue */
696                 LASSERT (!tx->tx_passive_rdma_wait);
697
698                 if (conn->ibc_nsends_posted == IBNAL_RX_MSGS)
699                         break;
700
701                 if (consume_credit) {
702                         if (conn->ibc_credits == 0)     /* no credits */
703                                 break;
704                 
705                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
706                             conn->ibc_outstanding_credits == 0) /* giving back credits */
707                                 break;
708                 }
709                 
710                 list_del (&tx->tx_list);
711
712                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
713                     (!list_empty(&conn->ibc_tx_queue) ||
714                      !list_empty(&conn->ibc_tx_queue_nocred) ||
715                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
716                       !kibnal_send_keepalive(conn)))) {
717                         /* redundant NOOP */
718                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
719                         kibnal_tx_done(tx);
720                         spin_lock_irqsave(&conn->ibc_lock, flags);
721                         continue;
722                 }
723
724                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
725                                 conn->ibc_outstanding_credits,
726                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
727
728                 conn->ibc_outstanding_credits = 0;
729                 conn->ibc_nsends_posted++;
730                 if (consume_credit)
731                         conn->ibc_credits--;
732
733                 tx->tx_sending = tx->tx_nsp;
734                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
735                 list_add (&tx->tx_list, &conn->ibc_active_txs);
736
737                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
738
739                 /* NB the gap between removing tx from the queue and sending it
740                  * allows message re-ordering to occur */
741
742                 LASSERT (tx->tx_nsp > 0);
743
744                 rc = -ECONNABORTED;
745                 nwork = 0;
746                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
747                         tx->tx_status = 0;
748                         /* Driver only accepts 1 item at a time */
749                         for (i = 0; i < tx->tx_nsp; i++) {
750                                 rc = kibnal_ib_send(conn->ibc_qp, &tx->tx_sp[i]);
751                                 if (rc != 0)
752                                         break;
753                                 nwork++;
754                         }
755                 }
756
757                 conn->ibc_last_send = jiffies;
758
759                 spin_lock_irqsave (&conn->ibc_lock, flags);
760                 if (rc != 0) {
761                         /* NB credits are transferred in the actual
762                          * message, which can only be the last work item */
763                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
764                         if (consume_credit)
765                                 conn->ibc_credits++;
766                         conn->ibc_nsends_posted--;
767
768                         tx->tx_status = rc;
769                         tx->tx_passive_rdma_wait = 0;
770                         tx->tx_sending -= tx->tx_nsp - nwork;
771
772                         done = (tx->tx_sending == 0);
773                         if (done)
774                                 list_del (&tx->tx_list);
775                         
776                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
777                         
778                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
779                                 CERROR ("Error %d posting transmit to %s\n", 
780                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
781                         else
782                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
783                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
784
785                         kibnal_close_conn (conn, rc);
786
787                         if (done)
788                                 kibnal_tx_done (tx);
789                         return;
790                 }
791                 
792         }
793
794         spin_unlock_irqrestore (&conn->ibc_lock, flags);
795 }
796
797 void
798 kibnal_tx_callback (struct ib_cq_entry *e)
799 {
800         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
801         kib_conn_t   *conn;
802         unsigned long flags;
803         int           idle;
804
805         conn = tx->tx_conn;
806         LASSERT (conn != NULL);
807         LASSERT (tx->tx_sending != 0);
808
809         spin_lock_irqsave(&conn->ibc_lock, flags);
810
811         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
812                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
813                e->status);
814
815         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
816          * gets to free it, which also drops its ref on 'conn'.  If it's
817          * not me, then I take an extra ref on conn so it can't disappear
818          * under me. */
819
820         tx->tx_sending--;
821         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
822                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
823         if (idle)
824                 list_del(&tx->tx_list);
825
826         kibnal_conn_addref(conn);
827
828         if (tx->tx_sending == 0)
829                 conn->ibc_nsends_posted--;
830
831         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
832             tx->tx_status == 0)
833                 tx->tx_status = -ECONNABORTED;
834                 
835         spin_unlock_irqrestore(&conn->ibc_lock, flags);
836
837         if (idle)
838                 kibnal_tx_done (tx);
839
840         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
841                 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", 
842                         libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
843                 kibnal_close_conn (conn, -ENETDOWN);
844         } else {
845                 kibnal_peer_alive(conn->ibc_peer);
846                 /* can I shovel some more sends out the door? */
847                 kibnal_check_sends(conn);
848         }
849
850         kibnal_conn_decref(conn);
851 }
852
853 void
854 kibnal_callback (ib_cq_t *cq, struct ib_cq_entry *e, void *arg)
855 {
856         if (kibnal_wreqid_is_rx(e->work_request_id))
857                 kibnal_rx_callback (e);
858         else
859                 kibnal_tx_callback (e);
860 }
861
862 void
863 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
864 {
865         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
866         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
867         int                       fence;
868         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
869
870         LASSERT (tx->tx_nsp >= 0 && 
871                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
872         LASSERT (nob <= IBNAL_MSG_SIZE);
873
874         kibnal_init_msg(tx->tx_msg, type, body_nob);
875
876         /* Fence the message if it's bundled with an RDMA read */
877         fence = (tx->tx_nsp > 0) &&
878                 (type == IBNAL_MSG_PUT_DONE);
879
880         *gl = (struct ib_gather_scatter) {
881                 .address = tx->tx_vaddr,
882                 .length  = nob,
883                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
884         };
885
886         /* NB If this is an RDMA read, the completion message must wait for
887          * the RDMA to complete.  Sends wait for previous RDMA writes
888          * anyway... */
889         *sp = (struct ib_send_param) {
890                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
891                 .op                   = IB_OP_SEND,
892                 .gather_list          = gl,
893                 .num_gather_entries   = 1,
894                 .device_specific      = NULL,
895                 .solicited_event      = 1,
896                 .signaled             = 1,
897                 .immediate_data_valid = 0,
898                 .fence                = fence,
899                 .inline_data          = 0,
900         };
901
902         tx->tx_nsp++;
903 }
904
905 void
906 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
907 {
908         unsigned long         flags;
909
910         spin_lock_irqsave(&conn->ibc_lock, flags);
911
912         kibnal_queue_tx_locked (tx, conn);
913         
914         spin_unlock_irqrestore(&conn->ibc_lock, flags);
915         
916         kibnal_check_sends(conn);
917 }
918
919 void
920 kibnal_schedule_active_connect_locked (kib_peer_t *peer)
921 {
922         /* Called with exclusive kib_global_lock */
923
924         peer->ibp_connecting++;
925         kibnal_peer_addref(peer); /* extra ref for connd */
926         
927         spin_lock (&kibnal_data.kib_connd_lock);
928         
929         LASSERT (list_empty(&peer->ibp_connd_list));
930         list_add_tail (&peer->ibp_connd_list,
931                        &kibnal_data.kib_connd_peers);
932         wake_up (&kibnal_data.kib_connd_waitq);
933         
934         spin_unlock (&kibnal_data.kib_connd_lock);
935 }
936
937 void
938 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
939 {
940         unsigned long    flags;
941         kib_peer_t      *peer;
942         kib_conn_t      *conn;
943         int              retry;
944         int              rc;
945         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
946
947         /* If I get here, I've committed to send, so I complete the tx with
948          * failure on any problems */
949         
950         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
951         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
952
953         for (retry = 0; ; retry = 1) {
954                 read_lock_irqsave(g_lock, flags);
955         
956                 peer = kibnal_find_peer_locked (nid);
957                 if (peer != NULL) {
958                         conn = kibnal_find_conn_locked (peer);
959                         if (conn != NULL) {
960                                 kibnal_conn_addref(conn); /* 1 ref for me...*/
961                                 read_unlock_irqrestore(g_lock, flags);
962                 
963                                 kibnal_queue_tx (tx, conn);
964                                 kibnal_conn_decref(conn); /* ...until here */
965                                 return;
966                         }
967                 }
968                 
969                 /* Making one or more connections; I'll need a write lock... */
970                 read_unlock(g_lock);
971                 write_lock(g_lock);
972
973                 peer = kibnal_find_peer_locked (nid);
974                 if (peer != NULL)
975                         break;
976                 
977                 write_unlock_irqrestore (g_lock, flags);
978
979                 if (retry) {
980                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
981                         tx->tx_status = -EHOSTUNREACH;
982                         kibnal_tx_done (tx);
983                         return;
984                 }
985
986                 rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid),
987                                                 lnet_acceptor_port());
988                 if (rc != 0) {
989                         CERROR("Can't add peer %s: %d\n",
990                                libcfs_nid2str(nid), rc);
991                         tx->tx_status = rc;
992                         kibnal_tx_done(tx);
993                         return;
994                 }
995         }
996
997         conn = kibnal_find_conn_locked (peer);
998         if (conn != NULL) {
999                 /* Connection exists; queue message on it */
1000                 kibnal_conn_addref(conn);       /* +1 ref from me... */
1001                 write_unlock_irqrestore (g_lock, flags);
1002                 
1003                 kibnal_queue_tx (tx, conn);
1004                 kibnal_conn_decref(conn);       /* ...until here */
1005                 return;
1006         }
1007
1008         if (peer->ibp_connecting == 0 &&
1009             peer->ibp_accepting == 0) {
1010                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1011                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1012                         write_unlock_irqrestore (g_lock, flags);
1013                         tx->tx_status = -EHOSTUNREACH;
1014                         kibnal_tx_done (tx);
1015                         return;
1016                 }
1017         
1018                 kibnal_schedule_active_connect_locked(peer);
1019         }
1020         
1021         /* A connection is being established; queue the message... */
1022         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1023
1024         write_unlock_irqrestore (g_lock, flags);
1025 }
1026
1027 void
1028 kibnal_txlist_done (struct list_head *txlist, int status)
1029 {
1030         kib_tx_t *tx;
1031
1032         while (!list_empty(txlist)) {
1033                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1034
1035                 list_del (&tx->tx_list);
1036                 /* complete now */
1037                 tx->tx_status = status;
1038                 kibnal_tx_done (tx);
1039         }
1040 }
1041
1042 int
1043 kibnal_start_passive_rdma (int type, lnet_msg_t *lntmsg,
1044                            int niov, struct iovec *iov, lnet_kiov_t *kiov,
1045                            int nob)
1046 {
1047         lnet_nid_t  nid = lntmsg->msg_target.nid;
1048         kib_tx_t   *tx;
1049         kib_msg_t  *ibmsg;
1050         int         rc;
1051         int         access;
1052         
1053         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1054                  type == IBNAL_MSG_GET_RDMA);
1055         LASSERT (nob > 0);
1056         LASSERT (!in_interrupt());              /* Mapping could block */
1057
1058         if (type == IBNAL_MSG_PUT_RDMA) {
1059                 access = IB_ACCESS_REMOTE_READ;
1060         } else {
1061                 access = IB_ACCESS_REMOTE_WRITE |
1062                          IB_ACCESS_LOCAL_WRITE;
1063         }
1064
1065         tx = kibnal_get_idle_tx ();
1066         if (tx == NULL) {
1067                 CERROR("Can't allocate %s txd for %s\n",
1068                        (type == IBNAL_MSG_PUT_RDMA) ? "PUT/REPLY" : "GET",
1069                        libcfs_nid2str(nid));
1070                 return -ENOMEM;
1071         }
1072
1073         
1074         if (iov != NULL) 
1075                 rc = kibnal_map_iov (tx, access, niov, iov, 0, nob);
1076         else
1077                 rc = kibnal_map_kiov (tx, access, niov, kiov, 0, nob);
1078
1079         if (rc != 0) {
1080                 CERROR ("Can't map RDMA for %s: %d\n", 
1081                         libcfs_nid2str(nid), rc);
1082                 goto failed;
1083         }
1084         
1085         if (type == IBNAL_MSG_GET_RDMA) {
1086                 /* reply gets finalized when tx completes */
1087                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni, 
1088                                                          lntmsg);
1089                 if (tx->tx_lntmsg[1] == NULL) {
1090                         CERROR ("Can't create reply for GET -> %s\n",
1091                                 libcfs_nid2str(nid));
1092                         rc = -ENOMEM;
1093                         goto failed;
1094                 }
1095         }
1096         
1097         tx->tx_passive_rdma = 1;
1098
1099         ibmsg = tx->tx_msg;
1100
1101         ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr;
1102         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1103         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1104         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1105         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1106
1107         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1108
1109         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1110                LPX64", nob %d\n",
1111                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1112                tx->tx_md.md_addr, nob);
1113         
1114         /* lntmsg gets finalized when tx completes. */
1115         tx->tx_lntmsg[0] = lntmsg;
1116
1117         kibnal_launch_tx(tx, nid);
1118         return (0);
1119
1120  failed:
1121         tx->tx_status = rc;
1122         kibnal_tx_done (tx);
1123         return (-EIO);
1124 }
1125
1126 void
1127 kibnal_start_active_rdma (int type, int status,
1128                           kib_rx_t *rx, lnet_msg_t *lntmsg, 
1129                           unsigned int niov,
1130                           struct iovec *iov, lnet_kiov_t *kiov,
1131                           int offset, int nob)
1132 {
1133         kib_msg_t    *rxmsg = rx->rx_msg;
1134         kib_msg_t    *txmsg;
1135         kib_tx_t     *tx;
1136         int           access;
1137         int           rdma_op;
1138         int           rc;
1139
1140         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1141                type, status, niov, offset, nob);
1142
1143         /* Called by scheduler */
1144         LASSERT (!in_interrupt ());
1145
1146         /* Either all pages or all vaddrs */
1147         LASSERT (!(kiov != NULL && iov != NULL));
1148
1149         /* No data if we're completing with failure */
1150         LASSERT (status == 0 || nob == 0);
1151
1152         LASSERT (type == IBNAL_MSG_GET_DONE ||
1153                  type == IBNAL_MSG_PUT_DONE);
1154
1155         if (type == IBNAL_MSG_GET_DONE) {
1156                 access   = 0;
1157                 rdma_op  = IB_OP_RDMA_WRITE;
1158                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1159         } else {
1160                 access   = IB_ACCESS_LOCAL_WRITE;
1161                 rdma_op  = IB_OP_RDMA_READ;
1162                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1163         }
1164
1165         tx = kibnal_get_idle_tx ();
1166         if (tx == NULL) {
1167                 CERROR ("tx descs exhausted on RDMA from %s"
1168                         " completing locally with failure\n",
1169                         libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid));
1170                 lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM);
1171                 return;
1172         }
1173         LASSERT (tx->tx_nsp == 0);
1174                         
1175         if (nob != 0) {
1176                 /* We actually need to transfer some data (the transfer
1177                  * size could get truncated to zero when the incoming
1178                  * message is matched) */
1179
1180                 if (kiov != NULL)
1181                         rc = kibnal_map_kiov (tx, access,
1182                                               niov, kiov, offset, nob);
1183                 else
1184                         rc = kibnal_map_iov (tx, access,
1185                                              niov, iov, offset, nob);
1186                 
1187                 if (rc != 0) {
1188                         CERROR ("Can't map RDMA -> %s: %d\n", 
1189                                 libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid), 
1190                                 rc);
1191                         /* We'll skip the RDMA and complete with failure. */
1192                         status = rc;
1193                         nob = 0;
1194                 } else {
1195                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1196                                 .address = tx->tx_md.md_addr,
1197                                 .length  = nob,
1198                                 .key     = tx->tx_md.md_lkey,
1199                         };
1200                 
1201                         tx->tx_sp[0] = (struct ib_send_param) {
1202                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1203                                 .op                   = rdma_op,
1204                                 .gather_list          = &tx->tx_gl[0],
1205                                 .num_gather_entries   = 1,
1206                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1207                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1208                                 .device_specific      = NULL,
1209                                 .solicited_event      = 0,
1210                                 .signaled             = 1,
1211                                 .immediate_data_valid = 0,
1212                                 .fence                = 0,
1213                                 .inline_data          = 0,
1214                         };
1215
1216                         tx->tx_nsp = 1;
1217                 }
1218         }
1219
1220         txmsg = tx->tx_msg;
1221
1222         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1223         txmsg->ibm_u.completion.ibcm_status = status;
1224         
1225         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1226
1227         if (status == 0 && nob != 0) {
1228                 LASSERT (tx->tx_nsp > 1);
1229                 /* RDMA: lntmsg gets finalized when the tx completes.  This
1230                  * is after the completion message has been sent, which in
1231                  * turn is after the RDMA has finished. */
1232                 tx->tx_lntmsg[0] = lntmsg;
1233         } else {
1234                 LASSERT (tx->tx_nsp == 1);
1235                 /* No RDMA: local completion happens now! */
1236                 CDEBUG(D_NET, "No data: immediate completion\n");
1237                 lnet_finalize (kibnal_data.kib_ni, lntmsg,
1238                               status == 0 ? 0 : -EIO);
1239         }
1240
1241         kibnal_queue_tx(tx, rx->rx_conn);
1242 }
1243
1244 int
1245 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1246 {
1247         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1248         int               type = lntmsg->msg_type; 
1249         lnet_process_id_t target = lntmsg->msg_target;
1250         int               target_is_router = lntmsg->msg_target_is_router;
1251         int               routing = lntmsg->msg_routing;
1252         unsigned int      payload_niov = lntmsg->msg_niov; 
1253         struct iovec     *payload_iov = lntmsg->msg_iov; 
1254         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1255         unsigned int      payload_offset = lntmsg->msg_offset;
1256         unsigned int      payload_nob = lntmsg->msg_len;
1257         kib_msg_t        *ibmsg;
1258         kib_tx_t         *tx;
1259         int               nob;
1260
1261         /* NB 'private' is different depending on what we're sending.... */
1262
1263         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1264                payload_nob, payload_niov, libcfs_id2str(target));
1265
1266         LASSERT (payload_nob == 0 || payload_niov > 0);
1267         LASSERT (payload_niov <= LNET_MAX_IOV);
1268
1269         /* Thread context if we're sending payload */
1270         LASSERT (!in_interrupt() || payload_niov == 0);
1271         /* payload is either all vaddrs or all pages */
1272         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1273
1274         switch (type) {
1275         default:
1276                 LBUG();
1277                 return (-EIO);
1278                 
1279         case LNET_MSG_ACK:
1280                 LASSERT (payload_nob == 0);
1281                 break;
1282
1283         case LNET_MSG_GET:
1284                 if (routing || target_is_router)
1285                         break;                  /* send IMMEDIATE */
1286
1287                 /* is the REPLY message too small for RDMA? */
1288                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1289                 if (nob <= IBNAL_MSG_SIZE)
1290                         break;                  /* send IMMEDIATE */
1291
1292                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1293                         return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1294                                                          lntmsg->msg_md->md_niov, 
1295                                                          lntmsg->msg_md->md_iov.iov, NULL,
1296                                                          lntmsg->msg_md->md_length);
1297
1298                 return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1299                                                  lntmsg->msg_md->md_niov, 
1300                                                  NULL, lntmsg->msg_md->md_iov.kiov,
1301                                                  lntmsg->msg_md->md_length);
1302
1303         case LNET_MSG_REPLY:
1304         case LNET_MSG_PUT:
1305                 /* Is the payload small enough not to need RDMA? */
1306                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1307                 if (nob <= IBNAL_MSG_SIZE)
1308                         break;                  /* send IMMEDIATE */
1309                 
1310                 return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg,
1311                                                  payload_niov,
1312                                                  payload_iov, payload_kiov,
1313                                                  payload_nob);
1314         }
1315
1316         /* Send IMMEDIATE */
1317
1318         tx = kibnal_get_idle_tx();
1319         if (tx == NULL) {
1320                 CERROR ("Can't send %d to %s: tx descs exhausted%s\n", 
1321                         type, libcfs_nid2str(target.nid), 
1322                         in_interrupt() ? " (intr)" : "");
1323                 return (-ENOMEM);
1324         }
1325
1326         ibmsg = tx->tx_msg;
1327         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1328
1329         if (payload_kiov != NULL)
1330                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1331                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1332                                     payload_niov, payload_kiov, 
1333                                     payload_offset, payload_nob);
1334         else
1335                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1336                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1337                                    payload_niov, payload_iov, 
1338                                    payload_offset, payload_nob);
1339
1340         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1341                             offsetof(kib_immediate_msg_t, 
1342                                      ibim_payload[payload_nob]));
1343
1344         /* lntmsg gets finalized when tx completes */
1345         tx->tx_lntmsg[0] = lntmsg;
1346
1347         kibnal_launch_tx(tx, target.nid);
1348         return (0);
1349 }
1350
1351 int
1352 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1353                    void **new_private)
1354 {
1355         kib_rx_t    *rx = private;
1356         kib_conn_t  *conn = rx->rx_conn;
1357
1358         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1359                 /* Can't block if RDMA completions need normal credits */
1360                 LCONSOLE_ERROR(0x12a, "Dropping message from %s: no buffers free. "
1361                                "%s is running an old version of LNET that may "
1362                                "deadlock if messages wait for buffers)\n",
1363                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1364                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1365                 return -EDEADLK;
1366         }
1367         
1368         *new_private = private;
1369         return 0;
1370 }
1371
1372 int
1373 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1374              int delayed, unsigned int niov,
1375              struct iovec *iov, lnet_kiov_t *kiov,
1376              unsigned int offset, unsigned int mlen, unsigned int rlen)
1377 {
1378         kib_rx_t    *rx = private;
1379         kib_msg_t   *rxmsg = rx->rx_msg;
1380         int          msg_nob;
1381         int          rc = 0;
1382         
1383         LASSERT (mlen <= rlen);
1384         LASSERT (!in_interrupt ());
1385         /* Either all pages or all vaddrs */
1386         LASSERT (!(kiov != NULL && iov != NULL));
1387
1388         switch (rxmsg->ibm_type) {
1389         default:
1390                 LBUG();
1391
1392         case IBNAL_MSG_IMMEDIATE:
1393                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1394                 if (msg_nob > rx->rx_nob) {
1395                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1396                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1397                                 msg_nob, rx->rx_nob);
1398                         rc = -EPROTO;
1399                         break;
1400                 }
1401
1402                 if (kiov != NULL)
1403                         lnet_copy_flat2kiov(
1404                                 niov, kiov, offset, 
1405                                 IBNAL_MSG_SIZE, rxmsg,
1406                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1407                                 mlen);
1408                 else
1409                         lnet_copy_flat2iov(
1410                                 niov, iov, offset,
1411                                 IBNAL_MSG_SIZE, rxmsg,
1412                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1413                                 mlen);
1414
1415                 lnet_finalize (ni, lntmsg, 0);
1416                 break;
1417
1418         case IBNAL_MSG_GET_RDMA:
1419                 if (lntmsg != NULL) {
1420                         /* GET matched: RDMA lntmsg's payload */
1421                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1422                                                  rx, lntmsg, 
1423                                                  lntmsg->msg_niov, 
1424                                                  lntmsg->msg_iov, 
1425                                                  lntmsg->msg_kiov,
1426                                                  lntmsg->msg_offset, 
1427                                                  lntmsg->msg_len);
1428                 } else {
1429                         /* GET didn't match anything */
1430                         kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
1431                                                   rx, NULL, 0, NULL, NULL, 0, 0);
1432                 }
1433                 break;
1434
1435         case IBNAL_MSG_PUT_RDMA:
1436                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg,
1437                                           niov, iov, kiov, offset, mlen);
1438                 break;
1439         }
1440
1441         kibnal_post_rx(rx, 1, 0);
1442         return rc;
1443 }
1444
1445 int
1446 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1447 {
1448         long    pid = kernel_thread (fn, arg, 0);
1449
1450         if (pid < 0)
1451                 return ((int)pid);
1452
1453         atomic_inc (&kibnal_data.kib_nthreads);
1454         return (0);
1455 }
1456
1457 void
1458 kibnal_thread_fini (void)
1459 {
1460         atomic_dec (&kibnal_data.kib_nthreads);
1461 }
1462
1463 void
1464 kibnal_peer_alive (kib_peer_t *peer)
1465 {
1466         /* This is racy, but everyone's only writing cfs_time_current() */
1467         peer->ibp_last_alive = cfs_time_current();
1468         mb();
1469 }
1470
1471 void
1472 kibnal_peer_notify (kib_peer_t *peer)
1473 {
1474         time_t        last_alive = 0;
1475         int           error = 0;
1476         unsigned long flags;
1477         
1478         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1479
1480         if (list_empty(&peer->ibp_conns) &&
1481             peer->ibp_accepting == 0 &&
1482             peer->ibp_connecting == 0 &&
1483             peer->ibp_error != 0) {
1484                 error = peer->ibp_error;
1485                 peer->ibp_error = 0;
1486                 last_alive = cfs_time_current_sec() -
1487                              cfs_duration_sec(cfs_time_current() -
1488                                               peer->ibp_last_alive);
1489         }
1490         
1491         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1492         
1493         if (error != 0)
1494                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1495 }
1496
1497 void
1498 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1499 {
1500         /* This just does the immmediate housekeeping, and schedules the
1501          * connection for the reaper to finish off.
1502          * Caller holds kib_global_lock exclusively in irq context */
1503         kib_peer_t   *peer = conn->ibc_peer;
1504
1505         CDEBUG (error == 0 ? D_NET : D_NETERROR,
1506                 "closing conn to %s: error %d\n", 
1507                 libcfs_nid2str(peer->ibp_nid), error);
1508         
1509         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1510                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1511
1512         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1513                 /* kib_reaper_conns takes ibc_list's ref */
1514                 list_del (&conn->ibc_list);
1515         } else {
1516                 /* new ref for kib_reaper_conns */
1517                 kibnal_conn_addref(conn);
1518         }
1519         
1520         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1521                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1522                     kibnal_peer_active(peer))     /* still in peer table */
1523                         kibnal_unlink_peer_locked (peer);
1524
1525                 peer->ibp_error = error; /* set/clear error on last conn */
1526         }
1527
1528         conn->ibc_state = IBNAL_CONN_DEATHROW;
1529
1530         /* Schedule conn for closing/destruction */
1531         spin_lock (&kibnal_data.kib_reaper_lock);
1532
1533         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1534         wake_up (&kibnal_data.kib_reaper_waitq);
1535                 
1536         spin_unlock (&kibnal_data.kib_reaper_lock);
1537 }
1538
1539 int
1540 kibnal_close_conn (kib_conn_t *conn, int why)
1541 {
1542         unsigned long     flags;
1543         int               count = 0;
1544
1545         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1546
1547         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1548         
1549         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1550                 count = 1;
1551                 kibnal_close_conn_locked (conn, why);
1552         }
1553         
1554         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1555         return (count);
1556 }
1557
1558 void
1559 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
1560 {
1561         LIST_HEAD        (zombies);
1562         unsigned long     flags;
1563
1564         LASSERT(error != 0);
1565
1566         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1567
1568         if (active) {
1569                 LASSERT (peer->ibp_connecting != 0);
1570                 peer->ibp_connecting--;
1571         } else {
1572                 LASSERT (peer->ibp_accepting != 0);
1573                 peer->ibp_accepting--;
1574         }
1575
1576         if (peer->ibp_connecting != 0 ||
1577             peer->ibp_accepting != 0) {
1578                 /* another connection attempt under way... */
1579                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1580                 return;
1581         }
1582
1583         if (list_empty(&peer->ibp_conns)) {
1584                 /* Say when active connection can be re-attempted */
1585                 peer->ibp_reconnect_interval *= 2;
1586                 peer->ibp_reconnect_interval =
1587                         MAX(peer->ibp_reconnect_interval,
1588                             *kibnal_tunables.kib_min_reconnect_interval);
1589                 peer->ibp_reconnect_interval =
1590                         MIN(peer->ibp_reconnect_interval,
1591                             *kibnal_tunables.kib_max_reconnect_interval);
1592                 
1593                 peer->ibp_reconnect_time = jiffies + 
1594                                            peer->ibp_reconnect_interval * HZ;
1595         
1596                 /* Take peer's blocked transmits; I'll complete
1597                  * them with error */
1598                 list_add(&zombies, &peer->ibp_tx_queue);
1599                 list_del_init(&peer->ibp_tx_queue);
1600                 
1601                 if (kibnal_peer_active(peer) &&
1602                     (peer->ibp_persistence == 0)) {
1603                         /* failed connection attempt on non-persistent peer */
1604                         kibnal_unlink_peer_locked (peer);
1605                 }
1606
1607                 peer->ibp_error = error;
1608         } else {
1609                 /* Can't have blocked transmits if there are connections */
1610                 LASSERT (list_empty(&peer->ibp_tx_queue));
1611         }
1612         
1613         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1614
1615         kibnal_peer_notify(peer);
1616         
1617         if (!list_empty (&zombies))
1618                 CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
1619                         libcfs_nid2str(peer->ibp_nid));
1620
1621         kibnal_txlist_done(&zombies, -EHOSTUNREACH);
1622 }
1623
1624 void
1625 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1626 {
1627         int               state = conn->ibc_state;
1628         kib_peer_t       *peer = conn->ibc_peer;
1629         kib_tx_t         *tx;
1630         unsigned long     flags;
1631         int               rc;
1632         int               i;
1633
1634         if (conn->ibc_connreq != NULL) {
1635                 LIBCFS_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1636                 conn->ibc_connreq = NULL;
1637         }
1638
1639         switch (state) {
1640         case IBNAL_CONN_CONNECTING:
1641                 /* conn has a CM comm_id */
1642                 if (status == 0) {
1643                         /* Install common (active/passive) callback for
1644                          * disconnect/idle notification */
1645                         rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1646                                                   kibnal_conn_callback,
1647                                                   conn);
1648                         LASSERT (rc == 0);
1649                 } else {
1650                         /* LASSERT (no more CM callbacks) */
1651                         rc = tsIbCmCallbackModify(conn->ibc_comm_id,
1652                                                   kibnal_bad_conn_callback,
1653                                                   conn);
1654                         LASSERT (rc == 0);
1655                 }
1656                 break;
1657                 
1658         case IBNAL_CONN_INIT_QP:
1659                 LASSERT (status != 0);
1660                 break;
1661                 
1662         default:
1663                 LBUG();
1664         }
1665         
1666         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1667
1668         if (active)
1669                 LASSERT (peer->ibp_connecting != 0);
1670         else
1671                 LASSERT (peer->ibp_accepting != 0);
1672         
1673         if (status == 0 &&                      /* connection established */
1674             kibnal_peer_active(peer)) {         /* peer not deleted */
1675
1676                 if (active)
1677                         peer->ibp_connecting--;
1678                 else
1679                         peer->ibp_accepting--;
1680
1681                 conn->ibc_last_send = jiffies;
1682                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1683                 kibnal_peer_alive(peer);
1684
1685                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1686                  * the IB_CM_IDLE callback */
1687                 kibnal_conn_addref(conn);
1688                 list_add (&conn->ibc_list, &peer->ibp_conns);
1689
1690                 peer->ibp_reconnect_interval = 0; /* OK to reconnect at any time */
1691
1692                 /* post blocked sends to the new connection */
1693                 spin_lock (&conn->ibc_lock);
1694                 
1695                 while (!list_empty (&peer->ibp_tx_queue)) {
1696                         tx = list_entry (peer->ibp_tx_queue.next, 
1697                                          kib_tx_t, tx_list);
1698                         
1699                         list_del (&tx->tx_list);
1700
1701                         kibnal_queue_tx_locked (tx, conn);
1702                 }
1703                 
1704                 spin_unlock (&conn->ibc_lock);
1705
1706                 /* Nuke any dangling conns from a different peer instance... */
1707                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1708                                                  conn->ibc_incarnation);
1709
1710                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1711
1712                 /* queue up all the receives */
1713                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1714                         /* +1 ref for rx desc */
1715                         kibnal_conn_addref(conn);
1716
1717                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1718                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1719                                conn->ibc_rxs[i].rx_vaddr);
1720
1721                         kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
1722                 }
1723
1724                 kibnal_check_sends (conn);
1725                 return;
1726         }
1727
1728         if (status == 0) {
1729                 /* connection established, but peer was deleted.  Schedule for
1730                  * reaper to cm_disconnect... */
1731                 status = -ECONNABORTED;
1732                 kibnal_close_conn_locked (conn, status);
1733         } else {
1734                 /* just waiting for refs to drain */
1735                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1736         } 
1737
1738         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1739
1740         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1741 }
1742
1743 int
1744 kibnal_accept_connreq (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1745                        kib_msg_t *msg, int nob)
1746 {
1747         kib_conn_t    *conn;
1748         kib_peer_t    *peer;
1749         kib_peer_t    *peer2;
1750         unsigned long  flags;
1751         int            rc;
1752
1753         rc = kibnal_unpack_msg(msg, 0, nob);
1754         if (rc != 0) {
1755                 CERROR("Can't unpack connreq msg: %d\n", rc);
1756                 return -EPROTO;
1757         }
1758
1759         CDEBUG(D_NET, "connreq from %s\n", libcfs_nid2str(msg->ibm_srcnid));
1760
1761         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1762                 CERROR("Unexpected connreq msg type: %x from %s\n",
1763                        msg->ibm_type, libcfs_nid2str(msg->ibm_srcnid));
1764                 return -EPROTO;
1765         }
1766                 
1767         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1768                 CERROR("Can't accept %s: bad queue depth %d (%d expected)\n",
1769                        libcfs_nid2str(msg->ibm_srcnid), 
1770                        msg->ibm_u.connparams.ibcp_queue_depth, 
1771                        IBNAL_MSG_QUEUE_SIZE);
1772                 return (-EPROTO);
1773         }
1774         
1775         conn = kibnal_create_conn();
1776         if (conn == NULL)
1777                 return (-ENOMEM);
1778
1779         /* assume 'nid' is a new peer */
1780         rc = kibnal_create_peer(&peer, msg->ibm_srcnid);
1781         if (rc != 0) {
1782                 kibnal_conn_decref(conn);
1783                 return (-ENOMEM);
1784         }
1785         
1786         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1787
1788         /* Check I'm the same instance that gave the connection parameters.  
1789          * NB If my incarnation changes after this, the peer will get nuked and
1790          * we'll spot that when the connection is finally added into the peer's
1791          * connlist */
1792         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
1793                                      msg->ibm_dstnid) ||
1794             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1795                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1796                 
1797                 CERROR("Stale connection params from %s\n",
1798                        libcfs_nid2str(msg->ibm_srcnid));
1799                 kibnal_conn_decref(conn);
1800                 kibnal_peer_decref(peer);
1801                 return -ESTALE;
1802         }
1803
1804         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1805         if (peer2 == NULL) {
1806                 /* Brand new peer */
1807                 LASSERT (peer->ibp_accepting == 0);
1808
1809                 /* peer table takes my ref on peer */
1810                 list_add_tail (&peer->ibp_list,
1811                                kibnal_nid2peerlist(msg->ibm_srcnid));
1812         } else {
1813                 /* tie-break connection race in favour of the higher NID */                
1814                 if (peer2->ibp_connecting != 0 &&
1815                     msg->ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
1816                         write_unlock_irqrestore(&kibnal_data.kib_global_lock,
1817                                                 flags);
1818                         CWARN("Conn race %s\n",
1819                               libcfs_nid2str(peer2->ibp_nid));
1820
1821                         kibnal_conn_decref(conn);
1822                         kibnal_peer_decref(peer);
1823                         return -EALREADY;
1824                 }
1825
1826                 kibnal_peer_decref(peer);
1827                 peer = peer2;
1828         }
1829
1830         /* +1 ref for conn */
1831         kibnal_peer_addref(peer);
1832         peer->ibp_accepting++;
1833
1834         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1835
1836         conn->ibc_peer = peer;
1837         conn->ibc_state = IBNAL_CONN_CONNECTING;
1838         conn->ibc_comm_id = cid;
1839         conn->ibc_incarnation = msg->ibm_srcstamp;
1840         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1841         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
1842         conn->ibc_version = msg->ibm_version;
1843
1844         *connp = conn;
1845         return (0);
1846 }
1847
1848 tTS_IB_CM_CALLBACK_RETURN
1849 kibnal_bad_conn_callback (tTS_IB_CM_EVENT event,
1850                           tTS_IB_CM_COMM_ID cid,
1851                           void *param,
1852                           void *arg)
1853 {
1854         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1855         LBUG ();
1856         return TS_IB_CM_CALLBACK_PROCEED;
1857 }
1858
1859 void
1860 kibnal_abort_txs (kib_conn_t *conn, struct list_head *txs)
1861 {
1862         LIST_HEAD        (zombies); 
1863         struct list_head *tmp;
1864         struct list_head *nxt;
1865         kib_tx_t         *tx;
1866         unsigned long     flags;
1867
1868         spin_lock_irqsave (&conn->ibc_lock, flags);
1869
1870         list_for_each_safe (tmp, nxt, txs) {
1871                 tx = list_entry (tmp, kib_tx_t, tx_list);
1872
1873                 if (txs == &conn->ibc_active_txs) {
1874                         LASSERT (tx->tx_passive_rdma ||
1875                                  !tx->tx_passive_rdma_wait);
1876
1877                         LASSERT (tx->tx_passive_rdma_wait ||
1878                                  tx->tx_sending != 0);
1879                 } else {
1880                         LASSERT (!tx->tx_passive_rdma_wait);
1881                         LASSERT (tx->tx_sending == 0);
1882                 }
1883
1884                 tx->tx_status = -ECONNABORTED;
1885                 tx->tx_passive_rdma_wait = 0;
1886
1887                 if (tx->tx_sending == 0) {
1888                         list_del (&tx->tx_list);
1889                         list_add (&tx->tx_list, &zombies);
1890                 }
1891         }
1892         
1893         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1894
1895         kibnal_txlist_done (&zombies, -ECONNABORTED);
1896 }
1897
1898 tTS_IB_CM_CALLBACK_RETURN
1899 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1900                       tTS_IB_CM_COMM_ID cid,
1901                       void *param,
1902                       void *arg)
1903 {
1904         kib_conn_t       *conn = arg;
1905         int               rc;
1906
1907         /* Established Connection Notifier */
1908
1909         switch (event) {
1910         default:
1911                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
1912                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1913                 kibnal_close_conn (conn, -ECONNABORTED);
1914                 break;
1915                 
1916         case TS_IB_CM_DISCONNECTED:
1917                 CDEBUG(D_NETERROR, "Connection %p -> %s DISCONNECTED.\n",
1918                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1919                 kibnal_close_conn (conn, 0);
1920                 break;
1921
1922         case TS_IB_CM_IDLE:
1923                 CDEBUG(D_NET, "Connection %p -> %s IDLE.\n",
1924                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1925
1926                 /* LASSERT (no further callbacks) */
1927                 rc = tsIbCmCallbackModify(cid, kibnal_bad_conn_callback, conn);
1928                 LASSERT (rc == 0);
1929
1930                 /* NB we wait until the connection has closed before
1931                  * completing outstanding passive RDMAs so we can be sure
1932                  * the network can't touch the mapped memory any more. */
1933
1934                 kibnal_abort_txs(conn, &conn->ibc_tx_queue);
1935                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
1936                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
1937                 kibnal_abort_txs(conn, &conn->ibc_active_txs);
1938                 
1939                 kibnal_conn_decref(conn);        /* Lose CM's ref */
1940                 break;
1941         }
1942
1943         return TS_IB_CM_CALLBACK_PROCEED;
1944 }
1945
1946 tTS_IB_CM_CALLBACK_RETURN
1947 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1948                               tTS_IB_CM_COMM_ID cid,
1949                               void *param,
1950                               void *arg)
1951 {
1952         kib_conn_t  *conn = arg;
1953         int          rc;
1954         
1955         switch (event) {
1956         default:
1957                 if (conn == NULL) {
1958                         /* no connection yet */
1959                         CERROR ("Unexpected event: %d\n", event);
1960                         return TS_IB_CM_CALLBACK_ABORT;
1961                 }
1962                 
1963                 CERROR ("%s event %p -> %s: %d\n",
1964                         (event == TS_IB_CM_IDLE) ? "IDLE" : "Unexpected",
1965                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1966                 kibnal_connreq_done(conn, 0, -ECONNABORTED);
1967                 kibnal_conn_decref(conn); /* drop CM's ref */
1968                 return TS_IB_CM_CALLBACK_ABORT;
1969                 
1970         case TS_IB_CM_REQ_RECEIVED: {
1971                 struct ib_cm_req_received_param *req = param;
1972                 kib_msg_t                       *msg = req->remote_private_data;
1973
1974                 LASSERT (conn == NULL);
1975
1976                 /* Don't really know srcnid until successful unpack */
1977                 CDEBUG(D_NET, "REQ from ?%s?\n", libcfs_nid2str(msg->ibm_srcnid));
1978
1979                 rc = kibnal_accept_connreq(&conn, cid, msg, 
1980                                            req->remote_private_data_len);
1981                 if (rc != 0) {
1982                         CERROR ("Can't accept ?%s?: %d\n",
1983                                 libcfs_nid2str(msg->ibm_srcnid), rc);
1984                         return TS_IB_CM_CALLBACK_ABORT;
1985                 }
1986
1987                 /* update 'arg' for next callback */
1988                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
1989                 LASSERT (rc == 0);
1990
1991                 msg = req->accept_param.reply_private_data;
1992                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
1993                                 sizeof(msg->ibm_u.connparams));
1994
1995                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
1996
1997                 kibnal_pack_msg(msg, conn->ibc_version, 0, 
1998                                 conn->ibc_peer->ibp_nid, 
1999                                 conn->ibc_incarnation);
2000
2001                 req->accept_param.qp                     = conn->ibc_qp;
2002                 req->accept_param.reply_private_data_len = msg->ibm_nob;
2003                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
2004                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
2005                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
2006                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
2007
2008                 CDEBUG(D_NET, "Proceeding\n");
2009                 return TS_IB_CM_CALLBACK_PROCEED; /* CM takes my ref on conn */
2010         }
2011
2012         case TS_IB_CM_ESTABLISHED:
2013                 LASSERT (conn != NULL);
2014                 CWARN("Connection %p -> %s ESTABLISHED.\n",
2015                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2016
2017                 kibnal_connreq_done(conn, 0, 0);
2018                 return TS_IB_CM_CALLBACK_PROCEED;
2019         }
2020 }
2021
2022 tTS_IB_CM_CALLBACK_RETURN
2023 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
2024                              tTS_IB_CM_COMM_ID cid,
2025                              void *param,
2026                              void *arg)
2027 {
2028         kib_conn_t    *conn = arg;
2029         unsigned long  flags;
2030
2031         switch (event) {
2032         case TS_IB_CM_REP_RECEIVED: {
2033                 struct ib_cm_rep_received_param *rep = param;
2034                 kib_msg_t                       *msg = rep->remote_private_data;
2035                 int                              nob = rep->remote_private_data_len;
2036                 int                              rc;
2037
2038                 rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
2039                 if (rc != 0) {
2040                         CERROR ("Error %d unpacking conn ack from %s\n",
2041                                 rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2042                         kibnal_connreq_done(conn, 1, rc);
2043                         kibnal_conn_decref(conn); /* drop CM's ref */
2044                         return TS_IB_CM_CALLBACK_ABORT;
2045                 }
2046
2047                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2048                         CERROR ("Unexpected conn ack type %d from %s\n",
2049                                 msg->ibm_type, 
2050                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2051                         kibnal_connreq_done(conn, 1, -EPROTO);
2052                         kibnal_conn_decref(conn); /* drop CM's ref */
2053                         return TS_IB_CM_CALLBACK_ABORT;
2054                 }
2055
2056                 if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
2057                                              msg->ibm_srcnid) ||
2058                     !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
2059                                              msg->ibm_dstnid) ||
2060                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2061                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2062                         CERROR("Stale conn ack from %s\n",
2063                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2064                         kibnal_connreq_done(conn, 1, -ESTALE);
2065                         kibnal_conn_decref(conn); /* drop CM's ref */
2066                         return TS_IB_CM_CALLBACK_ABORT;
2067                 }
2068
2069                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2070                         CERROR ("Bad queue depth %d from %s\n",
2071                                 msg->ibm_u.connparams.ibcp_queue_depth,
2072                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2073                         kibnal_connreq_done(conn, 1, -EPROTO);
2074                         kibnal_conn_decref(conn); /* drop CM's ref */
2075                         return TS_IB_CM_CALLBACK_ABORT;
2076                 }
2077                                 
2078                 CDEBUG(D_NET, "Connection %p -> %s REP_RECEIVED.\n",
2079                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2080
2081                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2082                 conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2083                 return TS_IB_CM_CALLBACK_PROCEED;
2084         }
2085
2086         case TS_IB_CM_ESTABLISHED:
2087                 CWARN("Connection %p -> %s ESTABLISHED\n",
2088                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2089
2090                 kibnal_connreq_done(conn, 1, 0);
2091                 return TS_IB_CM_CALLBACK_PROCEED;
2092
2093         case TS_IB_CM_IDLE:
2094                 CDEBUG(D_NETERROR, "Connection %p -> %s IDLE\n",
2095                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2096                 /* I assume this connection attempt was rejected because the
2097                  * peer found a stale QP; I'll just try again */
2098                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2099                 kibnal_schedule_active_connect_locked(conn->ibc_peer);
2100                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2101
2102                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2103                 kibnal_conn_decref(conn); /* drop CM's ref */
2104                 return TS_IB_CM_CALLBACK_ABORT;
2105
2106         default:
2107                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
2108                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
2109                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2110                 kibnal_conn_decref(conn); /* drop CM's ref */
2111                 return TS_IB_CM_CALLBACK_ABORT;
2112         }
2113 }
2114
2115 int
2116 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2117                           struct ib_path_record *resp, int remaining,
2118                           void *arg)
2119 {
2120         kib_conn_t *conn = arg;
2121         kib_peer_t *peer = conn->ibc_peer;
2122         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2123
2124         if (status != 0) {
2125                 CDEBUG (D_NETERROR, "Pathreq %p -> %s failed: %d\n",
2126                         conn, libcfs_nid2str(peer->ibp_nid), status);
2127                 kibnal_connreq_done(conn, 1, status);
2128                 kibnal_conn_decref(conn); /* drop callback's ref */
2129                 return 1;    /* non-zero prevents further callbacks */
2130         }
2131
2132         conn->ibc_connreq->cr_path = *resp;
2133
2134         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2135         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2136         kibnal_pack_msg(msg, conn->ibc_version, 0, 
2137                         peer->ibp_nid, conn->ibc_incarnation);
2138
2139         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2140                 .qp                   = conn->ibc_qp,
2141                 .req_private_data     = msg,
2142                 .req_private_data_len = msg->ibm_nob,
2143                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2144                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2145                 .retry_count          = IBNAL_RETRY,
2146                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2147                 .cm_response_timeout  = *kibnal_tunables.kib_timeout,
2148                 .max_cm_retries       = IBNAL_CM_RETRY,
2149                 .flow_control         = IBNAL_FLOW_CONTROL,
2150         };
2151
2152         /* XXX set timeout just like SDP!!!*/
2153         conn->ibc_connreq->cr_path.packet_life = 13;
2154         
2155         /* Flag I'm getting involved with the CM... */
2156         conn->ibc_state = IBNAL_CONN_CONNECTING;
2157
2158         CDEBUG(D_NET, "Connecting to, service id "LPX64", on %s\n",
2159                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 
2160                libcfs_nid2str(peer->ibp_nid));
2161
2162         /* kibnal_connect_callback gets my conn ref */
2163         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2164                                 &conn->ibc_connreq->cr_path, NULL,
2165                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2166                                 kibnal_active_conn_callback, conn,
2167                                 &conn->ibc_comm_id);
2168         if (status != 0) {
2169                 CERROR ("Connect %p -> %s failed: %d\n",
2170                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
2171                 /* Back out state change: I've not got a CM comm_id yet... */
2172                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2173                 kibnal_connreq_done(conn, 1, status);
2174                 kibnal_conn_decref(conn); /* Drop callback's ref */
2175         }
2176         
2177         return 1;    /* non-zero to prevent further callbacks */
2178 }
2179
2180 void
2181 kibnal_connect_peer (kib_peer_t *peer)
2182 {
2183         kib_conn_t  *conn;
2184         int          rc;
2185
2186         conn = kibnal_create_conn();
2187         if (conn == NULL) {
2188                 CERROR ("Can't allocate conn\n");
2189                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2190                 return;
2191         }
2192
2193         conn->ibc_peer = peer;
2194         kibnal_peer_addref(peer);
2195
2196         LIBCFS_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2197         if (conn->ibc_connreq == NULL) {
2198                 CERROR ("Can't allocate connreq\n");
2199                 kibnal_connreq_done(conn, 1, -ENOMEM);
2200                 kibnal_conn_decref(conn); /* drop my ref */
2201                 return;
2202         }
2203
2204         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2205
2206         rc = kibnal_make_svcqry(conn);
2207         if (rc != 0) {
2208                 kibnal_connreq_done (conn, 1, rc);
2209                 kibnal_conn_decref(conn); /* drop my ref */
2210                 return;
2211         }
2212
2213         rc = ib_cached_gid_get(kibnal_data.kib_device,
2214                                kibnal_data.kib_port, 0,
2215                                conn->ibc_connreq->cr_gid);
2216         LASSERT (rc == 0);
2217
2218         /* kibnal_pathreq_callback gets my conn ref */
2219         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2220                                     kibnal_data.kib_port,
2221                                     conn->ibc_connreq->cr_gid,
2222                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2223                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2224                                     0,
2225                                     *kibnal_tunables.kib_timeout * HZ,
2226                                     0,
2227                                     kibnal_pathreq_callback, conn, 
2228                                     &conn->ibc_connreq->cr_tid);
2229         if (rc == 0)
2230                 return; /* callback now has my ref on conn */
2231
2232         CERROR ("Path record request %p -> %s failed: %d\n",
2233                 conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
2234         kibnal_connreq_done(conn, 1, rc);
2235         kibnal_conn_decref(conn); /* drop my ref */
2236 }
2237
2238 int
2239 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
2240 {
2241         kib_tx_t          *tx;
2242         struct list_head  *ttmp;
2243         unsigned long      flags;
2244         int                timed_out = 0;
2245
2246         spin_lock_irqsave (&conn->ibc_lock, flags);
2247
2248         list_for_each (ttmp, txs) {
2249                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2250
2251                 if (txs == &conn->ibc_active_txs) {
2252                         LASSERT (tx->tx_passive_rdma ||
2253                                  !tx->tx_passive_rdma_wait);
2254
2255                         LASSERT (tx->tx_passive_rdma_wait ||
2256                                  tx->tx_sending != 0);
2257                 } else {
2258                         LASSERT (!tx->tx_passive_rdma_wait);
2259                         LASSERT (tx->tx_sending == 0);
2260                 }
2261                 
2262                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2263                         timed_out = 1;
2264                         break;
2265                 }
2266         }
2267
2268         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2269         return timed_out;
2270 }
2271
2272 int
2273 kibnal_conn_timed_out (kib_conn_t *conn)
2274 {
2275         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
2276                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
2277                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
2278                 kibnal_check_txs(conn, &conn->ibc_active_txs);
2279 }
2280
2281 void
2282 kibnal_check_conns (int idx)
2283 {
2284         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2285         struct list_head  *ptmp;
2286         kib_peer_t        *peer;
2287         kib_conn_t        *conn;
2288         struct list_head  *ctmp;
2289         unsigned long      flags;
2290
2291  again:
2292         /* NB. We expect to have a look at all the peers and not find any
2293          * rdmas to time out, so we just use a shared lock while we
2294          * take a look... */
2295         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2296
2297         list_for_each (ptmp, peers) {
2298                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2299
2300                 list_for_each (ctmp, &peer->ibp_conns) {
2301                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2302
2303                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2304
2305
2306                         /* In case we have enough credits to return via a
2307                          * NOOP, but there were no non-blocking tx descs
2308                          * free to do it last time... */
2309                         kibnal_check_sends(conn);
2310
2311                         if (!kibnal_conn_timed_out(conn))
2312                                 continue;
2313                         
2314                         kibnal_conn_addref(conn);
2315
2316                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2317                                                flags);
2318
2319                         CERROR("Timed out RDMA with %s\n",
2320                                libcfs_nid2str(peer->ibp_nid));
2321
2322                         kibnal_close_conn (conn, -ETIMEDOUT);
2323                         kibnal_conn_decref(conn);
2324
2325                         /* start again now I've dropped the lock */
2326                         goto again;
2327                 }
2328         }
2329
2330         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2331 }
2332
2333 void
2334 kibnal_terminate_conn (kib_conn_t *conn)
2335 {
2336         int           rc;
2337
2338         CDEBUG(D_NET, "conn %p\n", conn);
2339         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2340         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2341
2342         rc = ib_cm_disconnect (conn->ibc_comm_id);
2343         if (rc != 0)
2344                 CERROR ("Error %d disconnecting conn %p -> %s\n",
2345                         rc, conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2346
2347         kibnal_peer_notify(conn->ibc_peer);
2348 }
2349
2350 int
2351 kibnal_reaper (void *arg)
2352 {
2353         wait_queue_t       wait;
2354         unsigned long      flags;
2355         kib_conn_t        *conn;
2356         int                timeout;
2357         int                i;
2358         int                peer_index = 0;
2359         unsigned long      deadline = jiffies;
2360         
2361         cfs_daemonize ("kibnal_reaper");
2362         cfs_block_allsigs ();
2363
2364         init_waitqueue_entry (&wait, current);
2365
2366         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2367
2368         while (!kibnal_data.kib_shutdown) {
2369                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2370                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2371                                            kib_conn_t, ibc_list);
2372                         list_del (&conn->ibc_list);
2373                         
2374                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2375
2376                         switch (conn->ibc_state) {
2377                         case IBNAL_CONN_DEATHROW:
2378                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2379                                 /* Disconnect: conn becomes a zombie in the
2380                                  * callback and last ref reschedules it
2381                                  * here... */
2382                                 kibnal_terminate_conn(conn);
2383                                 kibnal_conn_decref(conn);
2384                                 break;
2385
2386                         case IBNAL_CONN_INIT_QP:
2387                         case IBNAL_CONN_ZOMBIE:
2388                                 kibnal_destroy_conn (conn);
2389                                 break;
2390                                 
2391                         default:
2392                                 CERROR ("Bad conn %p state: %d\n",
2393                                         conn, conn->ibc_state);
2394                                 LBUG();
2395                         }
2396
2397                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2398                         continue;
2399                 }
2400
2401                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2402
2403                 /* careful with the jiffy wrap... */
2404                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2405                         const int n = 4;
2406                         const int p = 1;
2407                         int       chunk = kibnal_data.kib_peer_hash_size;
2408                         
2409                         /* Time to check for RDMA timeouts on a few more
2410                          * peers: I do checks every 'p' seconds on a
2411                          * proportion of the peer table and I need to check
2412                          * every connection 'n' times within a timeout
2413                          * interval, to ensure I detect a timeout on any
2414                          * connection within (n+1)/n times the timeout
2415                          * interval. */
2416
2417                         if (*kibnal_tunables.kib_timeout > n * p)
2418                                 chunk = (chunk * n * p) / 
2419                                         *kibnal_tunables.kib_timeout;
2420                         if (chunk == 0)
2421                                 chunk = 1;
2422
2423                         for (i = 0; i < chunk; i++) {
2424                                 kibnal_check_conns (peer_index);
2425                                 peer_index = (peer_index + 1) % 
2426                                              kibnal_data.kib_peer_hash_size;
2427                         }
2428
2429                         deadline += p * HZ;
2430                 }
2431
2432                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2433
2434                 set_current_state (TASK_INTERRUPTIBLE);
2435                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2436
2437                 schedule_timeout (timeout);
2438
2439                 set_current_state (TASK_RUNNING);
2440                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2441
2442                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2443         }
2444
2445         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2446
2447         kibnal_thread_fini ();
2448         return (0);
2449 }
2450
2451 int
2452 kibnal_connd (void *arg)
2453 {
2454         long               id = (long)arg;
2455         char               name[16];
2456         wait_queue_t       wait;
2457         unsigned long      flags;
2458         kib_peer_t        *peer;
2459         kib_acceptsock_t  *as;
2460         int                did_something;
2461
2462         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2463         cfs_daemonize(name);
2464         cfs_block_allsigs();
2465
2466         init_waitqueue_entry (&wait, current);
2467
2468         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2469
2470         while (!kibnal_data.kib_shutdown) {
2471                 did_something = 0;
2472
2473                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2474                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2475                                          kib_acceptsock_t, ibas_list);
2476                         list_del (&as->ibas_list);
2477                         
2478                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2479
2480                         kibnal_handle_svcqry(as->ibas_sock);
2481                         kibnal_free_acceptsock(as);
2482                         
2483                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2484                         did_something = 1;
2485                 }
2486                         
2487                 /* Only handle an outgoing connection request if there is someone left
2488                  * to handle an incoming svcqry */
2489                 if (!list_empty (&kibnal_data.kib_connd_peers) &&
2490                     ((kibnal_data.kib_connd_connecting + 1) < 
2491                      *kibnal_tunables.kib_n_connd)) {
2492                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2493                                            kib_peer_t, ibp_connd_list);
2494                         
2495                         list_del_init (&peer->ibp_connd_list);
2496                         kibnal_data.kib_connd_connecting++;
2497                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2498
2499                         kibnal_connect_peer (peer);
2500                         kibnal_peer_decref(peer);
2501
2502                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2503                         did_something = 1;
2504                         kibnal_data.kib_connd_connecting--;
2505                 }
2506
2507                 if (did_something)
2508                         continue;
2509
2510                 set_current_state (TASK_INTERRUPTIBLE);
2511                 add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
2512
2513                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2514
2515                 schedule();
2516
2517                 set_current_state (TASK_RUNNING);
2518                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2519
2520                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2521         }
2522
2523         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2524
2525         kibnal_thread_fini ();
2526         return (0);
2527 }
2528
2529 int
2530 kibnal_scheduler(void *arg)
2531 {
2532         long            id = (long)arg;
2533         char            name[16];
2534         kib_rx_t       *rx;
2535         kib_tx_t       *tx;
2536         unsigned long   flags;
2537         int             rc;
2538         int             counter = 0;
2539         int             did_something;
2540
2541         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2542         cfs_daemonize(name);
2543         cfs_block_allsigs();
2544
2545         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2546
2547         while (!kibnal_data.kib_shutdown) {
2548                 did_something = 0;
2549
2550                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2551                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2552                                         kib_tx_t, tx_list);
2553                         list_del(&tx->tx_list);
2554                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2555                                                flags);
2556                         kibnal_tx_done(tx);
2557
2558                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2559                                           flags);
2560                 }
2561
2562                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2563                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2564                                         kib_rx_t, rx_list);
2565                         list_del(&rx->rx_list);
2566                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2567                                                flags);
2568
2569                         kibnal_rx(rx);
2570
2571                         did_something = 1;
2572                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2573                                           flags);
2574                 }
2575
2576                 /* nothing to do or hogging CPU */
2577                 if (!did_something || counter++ == IBNAL_RESCHED) {
2578                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2579                                                flags);
2580                         counter = 0;
2581
2582                         if (!did_something) {
2583                                 rc = wait_event_interruptible_exclusive(
2584                                         kibnal_data.kib_sched_waitq,
2585                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2586                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2587                                         kibnal_data.kib_shutdown);
2588                         } else {
2589                                 our_cond_resched();
2590                         }
2591
2592                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2593                                           flags);
2594                 }
2595         }
2596
2597         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2598
2599         kibnal_thread_fini();
2600         return (0);
2601 }