Whamcloud - gitweb
be869e9eea52f7b51b91df3e6148e49b346887db
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openiblnd.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         lnet_msg_t      *lntmsg[2];
47         unsigned long    flags;
48         int              i;
49         int              rc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         if (in_interrupt()) {
55                 /* can't deregister memory/flush FMAs/finalize in IRQ context... */
56                 kibnal_schedule_tx_done(tx);
57                 return;
58         }
59
60         switch (tx->tx_mapped) {
61         default:
62                 LBUG();
63
64         case KIB_TX_UNMAPPED:
65                 break;
66                 
67         case KIB_TX_MAPPED:
68                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
69                 LASSERT (rc == 0);
70                 tx->tx_mapped = KIB_TX_UNMAPPED;
71                 break;
72
73 #if IBNAL_FMR
74         case KIB_TX_MAPPED_FMR:
75                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
76                 LASSERT (rc == 0);
77
78 #ifndef USING_TSAPI
79                 /* Somewhat belt-and-braces since the tx's conn has closed if
80                  * this was a passive RDMA waiting to complete... */
81                 if (tx->tx_status != 0)
82                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
83 #endif
84                 tx->tx_mapped = KIB_TX_UNMAPPED;
85                 break;
86 #endif
87         }
88
89         /* tx may have up to 2 ptlmsgs to finalise */
90         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
91         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
92         rc = tx->tx_status;
93
94         if (tx->tx_conn != NULL) {
95                 kibnal_conn_decref(tx->tx_conn);
96                 tx->tx_conn = NULL;
97         }
98
99         tx->tx_nsp = 0;
100         tx->tx_passive_rdma = 0;
101         tx->tx_status = 0;
102
103         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
104
105         list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
106
107         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
108
109         /* delay finalize until my descs have been freed */
110         for (i = 0; i < 2; i++) {
111                 if (lntmsg[i] == NULL)
112                         continue;
113
114                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
115         }
116 }
117
118 kib_tx_t *
119 kibnal_get_idle_tx (void) 
120 {
121         unsigned long  flags;
122         kib_tx_t      *tx;
123         
124         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
125
126         if (list_empty (&kibnal_data.kib_idle_txs)) {
127                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
128                 return NULL;
129         }
130
131         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
132         list_del (&tx->tx_list);
133
134         /* Allocate a new passive RDMA completion cookie.  It might not be
135          * needed, but we've got a lock right now and we're unlikely to
136          * wrap... */
137         tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
138
139         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
140
141         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
142         LASSERT (tx->tx_nsp == 0);
143         LASSERT (tx->tx_sending == 0);
144         LASSERT (tx->tx_status == 0);
145         LASSERT (tx->tx_conn == NULL);
146         LASSERT (!tx->tx_passive_rdma);
147         LASSERT (!tx->tx_passive_rdma_wait);
148         LASSERT (tx->tx_lntmsg[0] == NULL);
149         LASSERT (tx->tx_lntmsg[1] == NULL);
150
151         return tx;
152 }
153
154 void
155 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
156 {
157         struct list_head *ttmp;
158         unsigned long     flags;
159         int               idle;
160
161         spin_lock_irqsave (&conn->ibc_lock, flags);
162
163         list_for_each (ttmp, &conn->ibc_active_txs) {
164                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
165
166                 LASSERT (tx->tx_passive_rdma ||
167                          !tx->tx_passive_rdma_wait);
168
169                 LASSERT (tx->tx_passive_rdma_wait ||
170                          tx->tx_sending != 0);
171
172                 if (!tx->tx_passive_rdma_wait ||
173                     tx->tx_passive_rdma_cookie != cookie)
174                         continue;
175
176                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
177
178                 /* XXX Set mlength of reply here */
179
180                 tx->tx_status = status;
181                 tx->tx_passive_rdma_wait = 0;
182                 idle = (tx->tx_sending == 0);
183
184                 if (idle)
185                         list_del (&tx->tx_list);
186
187                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
188
189                 /* I could be racing with tx callbacks.  It's whoever
190                  * _makes_ tx idle that frees it */
191                 if (idle)
192                         kibnal_tx_done (tx);
193                 return;
194         }
195                 
196         spin_unlock_irqrestore (&conn->ibc_lock, flags);
197
198         CERROR ("Unmatched (late?) RDMA completion "LPX64" from %s\n",
199                 cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
200 }
201
202 void
203 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
204 {
205         kib_conn_t   *conn = rx->rx_conn;
206         int           rc;
207         unsigned long flags;
208
209         LASSERT(!rsrvd_credit ||
210                 conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
211
212         rx->rx_gl = (struct ib_gather_scatter) {
213                 .address = rx->rx_vaddr,
214                 .length  = IBNAL_MSG_SIZE,
215                 .key     = conn->ibc_rx_pages->ibp_lkey,
216         };
217
218         rx->rx_sp = (struct ib_receive_param) {
219                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
220                 .scatter_list           = &rx->rx_gl,
221                 .num_scatter_entries    = 1,
222                 .device_specific        = NULL,
223                 .signaled               = 1,
224         };
225
226         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
227         LASSERT (rx->rx_nob >= 0);              /* not posted */
228         rx->rx_nob = -1;                        /* is now */
229         mb();
230
231         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
232                 rc = -ECONNABORTED;
233         else
234                 rc = kibnal_ib_receive(conn->ibc_qp, &rx->rx_sp);
235
236         if (rc == 0) {
237                 if (credit || rsrvd_credit) {
238                         spin_lock_irqsave(&conn->ibc_lock, flags);
239
240                         if (credit)
241                                 conn->ibc_outstanding_credits++;
242                         if (rsrvd_credit)
243                                 conn->ibc_reserved_credits++;
244                         
245                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
246
247                         kibnal_check_sends(conn);
248                 }
249                 return;
250         }
251
252         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
253                 CERROR ("Error posting receive -> %s: %d\n",
254                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
255                 kibnal_close_conn (rx->rx_conn, rc);
256         } else {
257                 CDEBUG (D_NET, "Error posting receive -> %s: %d\n",
258                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
259         }
260
261         /* Drop rx's ref */
262         kibnal_conn_decref(conn);
263 }
264
265 void
266 kibnal_rx_callback (struct ib_cq_entry *e)
267 {
268         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
269         kib_msg_t    *msg = rx->rx_msg;
270         kib_conn_t   *conn = rx->rx_conn;
271         int           credits;
272         unsigned long flags;
273         int           rc;
274         int           err = -ECONNABORTED;
275
276         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
277         LASSERT (rx->rx_nob < 0);               /* was posted */
278         rx->rx_nob = 0;                         /* isn't now */
279         mb();
280
281         /* receives complete with error in any case after we've started
282          * closing the QP */
283         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
284                 goto failed;
285
286         /* We don't post receives until the conn is established */
287         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
288
289         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
290                 CERROR("Rx from %s failed: %d\n", 
291                        libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
292                 goto failed;
293         }
294
295         LASSERT (e->bytes_transferred >= 0);
296         rx->rx_nob = e->bytes_transferred;
297         mb();
298
299         rc = kibnal_unpack_msg(msg, conn->ibc_version, rx->rx_nob);
300         if (rc != 0) {
301                 CERROR ("Error %d unpacking rx from %s\n",
302                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
303                 goto failed;
304         }
305
306         if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
307                                      msg->ibm_srcnid) ||
308             !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
309                                      msg->ibm_dstnid) ||
310             msg->ibm_srcstamp != conn->ibc_incarnation ||
311             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
312                 CERROR ("Stale rx from %s\n",
313                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
314                 err = -ESTALE;
315                 goto failed;
316         }
317
318         /* Have I received credits that will let me send? */
319         credits = msg->ibm_credits;
320         if (credits != 0) {
321                 spin_lock_irqsave(&conn->ibc_lock, flags);
322                 conn->ibc_credits += credits;
323                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
324                 
325                 kibnal_check_sends(conn);
326         }
327
328         switch (msg->ibm_type) {
329         case IBNAL_MSG_NOOP:
330                 kibnal_post_rx (rx, 1, 0);
331                 return;
332
333         case IBNAL_MSG_IMMEDIATE:
334                 break;
335                 
336         case IBNAL_MSG_PUT_RDMA:
337         case IBNAL_MSG_GET_RDMA:
338                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
339                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
340                        msg->ibm_u.rdma.ibrm_desc.rd_key,
341                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
342                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
343                 break;
344                 
345         case IBNAL_MSG_PUT_DONE:
346         case IBNAL_MSG_GET_DONE:
347                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
348                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
349                        msg->ibm_u.completion.ibcm_status);
350
351                 kibnal_complete_passive_rdma (conn, 
352                                               msg->ibm_u.completion.ibcm_cookie,
353                                               msg->ibm_u.completion.ibcm_status);
354
355                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
356                         kibnal_post_rx (rx, 1, 0);
357                 } else {
358                         /* this reply buffer was pre-reserved */
359                         kibnal_post_rx (rx, 0, 1);
360                 }
361                 return;
362                         
363         default:
364                 CERROR ("Bad msg type %x from %s\n",
365                         msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
366                 goto failed;
367         }
368
369         kibnal_peer_alive(conn->ibc_peer);
370
371         /* schedule for kibnal_rx() in thread context */
372         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
373         
374         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
375         wake_up (&kibnal_data.kib_sched_waitq);
376         
377         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
378         return;
379         
380  failed:
381         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
382         kibnal_close_conn(conn, err);
383
384         /* Don't re-post rx & drop its ref on conn */
385         kibnal_conn_decref(conn);
386 }
387
388 void
389 kibnal_rx (kib_rx_t *rx)
390 {
391         int          rc = 0;
392         kib_msg_t   *msg = rx->rx_msg;
393
394         switch (msg->ibm_type) {
395         case IBNAL_MSG_GET_RDMA:
396                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
397                                 msg->ibm_srcnid, rx, 1);
398                 break;
399                 
400         case IBNAL_MSG_PUT_RDMA:
401                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
402                                 msg->ibm_srcnid, rx, 1);
403                 break;
404
405         case IBNAL_MSG_IMMEDIATE:
406                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
407                                 msg->ibm_srcnid, rx, 0);
408                 break;
409
410         default:
411                 LBUG();
412                 break;
413         }
414
415         if (rc < 0) {
416                 kibnal_close_conn(rx->rx_conn, rc);
417                 kibnal_post_rx (rx, 1, 0);
418         }
419 }
420
421 #if 0
422 int
423 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
424 {
425         struct page *page;
426
427         if (vaddr >= VMALLOC_START &&
428             vaddr < VMALLOC_END)
429                 page = vmalloc_to_page ((void *)vaddr);
430 #if CONFIG_HIGHMEM
431         else if (vaddr >= PKMAP_BASE &&
432                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
433                 page = vmalloc_to_page ((void *)vaddr);
434         /* in 2.4 ^ just walks the page tables */
435 #endif
436         else
437                 page = virt_to_page (vaddr);
438
439         if (page == NULL ||
440             !VALID_PAGE (page))
441                 return (-EFAULT);
442
443         *physp = lnet_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
444         return (0);
445 }
446 #endif
447
448 int
449 kibnal_map_iov (kib_tx_t *tx, int access,
450                 unsigned int niov, struct iovec *iov, int offset, int nob)
451                  
452 {
453         void   *vaddr;
454         int     rc;
455
456         LASSERT (nob > 0);
457         LASSERT (niov > 0);
458         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
459
460         while (offset >= iov->iov_len) {
461                 offset -= iov->iov_len;
462                 niov--;
463                 iov++;
464                 LASSERT (niov > 0);
465         }
466
467         if (nob > iov->iov_len - offset) {
468                 CERROR ("Can't map multiple vaddr fragments\n");
469                 return (-EMSGSIZE);
470         }
471
472         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
473         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
474
475         rc = ib_memory_register (kibnal_data.kib_pd,
476                                  vaddr, nob,
477                                  access,
478                                  &tx->tx_md.md_handle.mr,
479                                  &tx->tx_md.md_lkey,
480                                  &tx->tx_md.md_rkey);
481         
482         if (rc != 0) {
483                 CERROR ("Can't map vaddr: %d\n", rc);
484                 return (rc);
485         }
486
487         tx->tx_mapped = KIB_TX_MAPPED;
488         return (0);
489 }
490
491 int
492 kibnal_map_kiov (kib_tx_t *tx, int access,
493                   int nkiov, lnet_kiov_t *kiov,
494                   int offset, int nob)
495 {
496 #if IBNAL_FMR
497         __u64                      *phys;
498         const int                   mapped = KIB_TX_MAPPED_FMR;
499 #else
500         struct ib_physical_buffer  *phys;
501         const int                   mapped = KIB_TX_MAPPED;
502 #endif
503         int                         page_offset;
504         int                         nphys;
505         int                         resid;
506         int                         phys_size;
507         int                         rc;
508
509         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
510
511         LASSERT (nob > 0);
512         LASSERT (nkiov > 0);
513         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
514
515         while (offset >= kiov->kiov_len) {
516                 offset -= kiov->kiov_len;
517                 nkiov--;
518                 kiov++;
519                 LASSERT (nkiov > 0);
520         }
521
522         phys_size = nkiov * sizeof (*phys);
523         LIBCFS_ALLOC(phys, phys_size);
524         if (phys == NULL) {
525                 CERROR ("Can't allocate tmp phys\n");
526                 return (-ENOMEM);
527         }
528
529         page_offset = kiov->kiov_offset + offset;
530 #if IBNAL_FMR
531         phys[0] = lnet_page2phys(kiov->kiov_page);
532 #else
533         phys[0].address = lnet_page2phys(kiov->kiov_page);
534         phys[0].size = PAGE_SIZE;
535 #endif
536         nphys = 1;
537         resid = nob - (kiov->kiov_len - offset);
538
539         while (resid > 0) {
540                 kiov++;
541                 nkiov--;
542                 LASSERT (nkiov > 0);
543
544                 if (kiov->kiov_offset != 0 ||
545                     ((resid > PAGE_SIZE) && 
546                      kiov->kiov_len < PAGE_SIZE)) {
547                         int i;
548                         /* Can't have gaps */
549                         CERROR ("Can't make payload contiguous in I/O VM:"
550                                 "page %d, offset %d, len %d \n", nphys, 
551                                 kiov->kiov_offset, kiov->kiov_len);
552
553                         for (i = -nphys; i < nkiov; i++) 
554                         {
555                                 CERROR("kiov[%d] %p +%d for %d\n",
556                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
557                         }
558                         
559                         rc = -EINVAL;
560                         goto out;
561                 }
562
563                 if (nphys == LNET_MAX_IOV) {
564                         CERROR ("payload too big (%d)\n", nphys);
565                         rc = -EMSGSIZE;
566                         goto out;
567                 }
568
569                 LASSERT (nphys * sizeof (*phys) < phys_size);
570 #if IBNAL_FMR
571                 phys[nphys] = lnet_page2phys(kiov->kiov_page);
572 #else
573                 phys[nphys].address = lnet_page2phys(kiov->kiov_page);
574                 phys[nphys].size = PAGE_SIZE;
575 #endif
576                 nphys++;
577
578                 resid -= PAGE_SIZE;
579         }
580
581         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
582
583 #if IBNAL_FMR
584         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
585                                        phys, nphys,
586                                        &tx->tx_md.md_addr,
587                                        page_offset,
588                                        &tx->tx_md.md_handle.fmr,
589                                        &tx->tx_md.md_lkey,
590                                        &tx->tx_md.md_rkey);
591 #else
592         rc = ib_memory_register_physical (kibnal_data.kib_pd,
593                                           phys, nphys,
594                                           &tx->tx_md.md_addr,
595                                           nob, page_offset,
596                                           access,
597                                           &tx->tx_md.md_handle.mr,
598                                           &tx->tx_md.md_lkey,
599                                           &tx->tx_md.md_rkey);
600 #endif
601         if (rc == 0) {
602                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
603                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
604                 tx->tx_mapped = mapped;
605         } else {
606                 CERROR ("Can't map phys: %d\n", rc);
607                 rc = -EFAULT;
608         }
609
610  out:
611         LIBCFS_FREE(phys, phys_size);
612         return (rc);
613 }
614
615 kib_conn_t *
616 kibnal_find_conn_locked (kib_peer_t *peer)
617 {
618         struct list_head *tmp;
619
620         /* just return the first connection */
621         list_for_each (tmp, &peer->ibp_conns) {
622                 return (list_entry(tmp, kib_conn_t, ibc_list));
623         }
624
625         return (NULL);
626 }
627
628 void
629 kibnal_check_sends (kib_conn_t *conn)
630 {
631         unsigned long   flags;
632         kib_tx_t       *tx;
633         int             rc;
634         int             i;
635         int             consume_credit;
636         int             done;
637         int             nwork;
638
639         spin_lock_irqsave (&conn->ibc_lock, flags);
640
641         LASSERT (conn->ibc_nsends_posted <= IBNAL_RX_MSGS);
642         LASSERT (conn->ibc_reserved_credits >= 0);
643
644         while (conn->ibc_reserved_credits > 0 &&
645                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
646                 LASSERT (conn->ibc_version !=
647                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
648                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
649                                 kib_tx_t, tx_list);
650                 list_del(&tx->tx_list);
651                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
652                 conn->ibc_reserved_credits--;
653         }
654
655         if (list_empty(&conn->ibc_tx_queue) &&
656             list_empty(&conn->ibc_tx_queue_nocred) &&
657             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
658              kibnal_send_keepalive(conn))) {
659                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
660                 
661                 tx = kibnal_get_idle_tx();
662                 if (tx != NULL)
663                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
664
665                 spin_lock_irqsave(&conn->ibc_lock, flags);
666                 
667                 if (tx != NULL)
668                         kibnal_queue_tx_locked(tx, conn);
669         }
670
671         for (;;) {
672                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
673                         LASSERT (conn->ibc_version !=
674                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
675                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
676                                         kib_tx_t, tx_list);
677                         consume_credit = 0;
678                 } else if (!list_empty (&conn->ibc_tx_queue)) {
679                         tx = list_entry (conn->ibc_tx_queue.next, 
680                                          kib_tx_t, tx_list);
681                         consume_credit = 1;
682                 } else {
683                         /* nothing waiting */
684                         break;
685                 }
686
687                 /* We rely on this for QP sizing */
688                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
689
690                 LASSERT (conn->ibc_outstanding_credits >= 0);
691                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
692                 LASSERT (conn->ibc_credits >= 0);
693                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
694
695                 /* Not on ibc_rdma_queue */
696                 LASSERT (!tx->tx_passive_rdma_wait);
697
698                 if (conn->ibc_nsends_posted == IBNAL_RX_MSGS)
699                         break;
700
701                 if (consume_credit) {
702                         if (conn->ibc_credits == 0)     /* no credits */
703                                 break;
704                 
705                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
706                             conn->ibc_outstanding_credits == 0) /* giving back credits */
707                                 break;
708                 }
709                 
710                 list_del (&tx->tx_list);
711
712                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
713                     (!list_empty(&conn->ibc_tx_queue) ||
714                      !list_empty(&conn->ibc_tx_queue_nocred) ||
715                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
716                       !kibnal_send_keepalive(conn)))) {
717                         /* redundant NOOP */
718                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
719                         kibnal_tx_done(tx);
720                         spin_lock_irqsave(&conn->ibc_lock, flags);
721                         continue;
722                 }
723
724                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
725                                 conn->ibc_outstanding_credits,
726                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
727
728                 conn->ibc_outstanding_credits = 0;
729                 conn->ibc_nsends_posted++;
730                 if (consume_credit)
731                         conn->ibc_credits--;
732
733                 tx->tx_sending = tx->tx_nsp;
734                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
735                 list_add (&tx->tx_list, &conn->ibc_active_txs);
736
737                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
738
739                 /* NB the gap between removing tx from the queue and sending it
740                  * allows message re-ordering to occur */
741
742                 LASSERT (tx->tx_nsp > 0);
743
744                 rc = -ECONNABORTED;
745                 nwork = 0;
746                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
747                         tx->tx_status = 0;
748                         /* Driver only accepts 1 item at a time */
749                         for (i = 0; i < tx->tx_nsp; i++) {
750                                 rc = kibnal_ib_send(conn->ibc_qp, &tx->tx_sp[i]);
751                                 if (rc != 0)
752                                         break;
753                                 nwork++;
754                         }
755                 }
756
757                 conn->ibc_last_send = jiffies;
758
759                 spin_lock_irqsave (&conn->ibc_lock, flags);
760                 if (rc != 0) {
761                         /* NB credits are transferred in the actual
762                          * message, which can only be the last work item */
763                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
764                         if (consume_credit)
765                                 conn->ibc_credits++;
766                         conn->ibc_nsends_posted--;
767
768                         tx->tx_status = rc;
769                         tx->tx_passive_rdma_wait = 0;
770                         tx->tx_sending -= tx->tx_nsp - nwork;
771
772                         done = (tx->tx_sending == 0);
773                         if (done)
774                                 list_del (&tx->tx_list);
775                         
776                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
777                         
778                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
779                                 CERROR ("Error %d posting transmit to %s\n", 
780                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
781                         else
782                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
783                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
784
785                         kibnal_close_conn (conn, rc);
786
787                         if (done)
788                                 kibnal_tx_done (tx);
789                         return;
790                 }
791                 
792         }
793
794         spin_unlock_irqrestore (&conn->ibc_lock, flags);
795 }
796
797 void
798 kibnal_tx_callback (struct ib_cq_entry *e)
799 {
800         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
801         kib_conn_t   *conn;
802         unsigned long flags;
803         int           idle;
804
805         conn = tx->tx_conn;
806         LASSERT (conn != NULL);
807         LASSERT (tx->tx_sending != 0);
808
809         spin_lock_irqsave(&conn->ibc_lock, flags);
810
811         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
812                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
813                e->status);
814
815         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
816          * gets to free it, which also drops its ref on 'conn'.  If it's
817          * not me, then I take an extra ref on conn so it can't disappear
818          * under me. */
819
820         tx->tx_sending--;
821         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
822                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
823         if (idle)
824                 list_del(&tx->tx_list);
825
826         kibnal_conn_addref(conn);
827
828         if (tx->tx_sending == 0)
829                 conn->ibc_nsends_posted--;
830
831         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
832             tx->tx_status == 0)
833                 tx->tx_status = -ECONNABORTED;
834                 
835         spin_unlock_irqrestore(&conn->ibc_lock, flags);
836
837         if (idle)
838                 kibnal_tx_done (tx);
839
840         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
841                 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", 
842                         libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
843                 kibnal_close_conn (conn, -ENETDOWN);
844         } else {
845                 kibnal_peer_alive(conn->ibc_peer);
846                 /* can I shovel some more sends out the door? */
847                 kibnal_check_sends(conn);
848         }
849
850         kibnal_conn_decref(conn);
851 }
852
853 void
854 kibnal_callback (ib_cq_t *cq, struct ib_cq_entry *e, void *arg)
855 {
856         if (kibnal_wreqid_is_rx(e->work_request_id))
857                 kibnal_rx_callback (e);
858         else
859                 kibnal_tx_callback (e);
860 }
861
862 void
863 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
864 {
865         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
866         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
867         int                       fence;
868         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
869
870         LASSERT (tx->tx_nsp >= 0 && 
871                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
872         LASSERT (nob <= IBNAL_MSG_SIZE);
873
874         kibnal_init_msg(tx->tx_msg, type, body_nob);
875
876         /* Fence the message if it's bundled with an RDMA read */
877         fence = (tx->tx_nsp > 0) &&
878                 (type == IBNAL_MSG_PUT_DONE);
879
880         *gl = (struct ib_gather_scatter) {
881                 .address = tx->tx_vaddr,
882                 .length  = nob,
883                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
884         };
885
886         /* NB If this is an RDMA read, the completion message must wait for
887          * the RDMA to complete.  Sends wait for previous RDMA writes
888          * anyway... */
889         *sp = (struct ib_send_param) {
890                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
891                 .op                   = IB_OP_SEND,
892                 .gather_list          = gl,
893                 .num_gather_entries   = 1,
894                 .device_specific      = NULL,
895                 .solicited_event      = 1,
896                 .signaled             = 1,
897                 .immediate_data_valid = 0,
898                 .fence                = fence,
899                 .inline_data          = 0,
900         };
901
902         tx->tx_nsp++;
903 }
904
905 void
906 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
907 {
908         unsigned long         flags;
909
910         spin_lock_irqsave(&conn->ibc_lock, flags);
911
912         kibnal_queue_tx_locked (tx, conn);
913         
914         spin_unlock_irqrestore(&conn->ibc_lock, flags);
915         
916         kibnal_check_sends(conn);
917 }
918
919 void
920 kibnal_schedule_active_connect_locked (kib_peer_t *peer)
921 {
922         /* Called with exclusive kib_global_lock */
923
924         peer->ibp_connecting++;
925         kibnal_peer_addref(peer); /* extra ref for connd */
926         
927         spin_lock (&kibnal_data.kib_connd_lock);
928         
929         LASSERT (list_empty(&peer->ibp_connd_list));
930         list_add_tail (&peer->ibp_connd_list,
931                        &kibnal_data.kib_connd_peers);
932         wake_up (&kibnal_data.kib_connd_waitq);
933         
934         spin_unlock (&kibnal_data.kib_connd_lock);
935 }
936
937 void
938 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
939 {
940         unsigned long    flags;
941         kib_peer_t      *peer;
942         kib_conn_t      *conn;
943         int              retry;
944         int              rc;
945         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
946
947         /* If I get here, I've committed to send, so I complete the tx with
948          * failure on any problems */
949         
950         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
951         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
952
953         for (retry = 0; ; retry = 1) {
954                 read_lock_irqsave(g_lock, flags);
955         
956                 peer = kibnal_find_peer_locked (nid);
957                 if (peer != NULL) {
958                         conn = kibnal_find_conn_locked (peer);
959                         if (conn != NULL) {
960                                 kibnal_conn_addref(conn); /* 1 ref for me...*/
961                                 read_unlock_irqrestore(g_lock, flags);
962                 
963                                 kibnal_queue_tx (tx, conn);
964                                 kibnal_conn_decref(conn); /* ...until here */
965                                 return;
966                         }
967                 }
968                 
969                 /* Making one or more connections; I'll need a write lock... */
970                 read_unlock(g_lock);
971                 write_lock(g_lock);
972
973                 peer = kibnal_find_peer_locked (nid);
974                 if (peer != NULL)
975                         break;
976                 
977                 write_unlock_irqrestore (g_lock, flags);
978
979                 if (retry) {
980                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
981                         tx->tx_status = -EHOSTUNREACH;
982                         kibnal_tx_done (tx);
983                         return;
984                 }
985
986                 rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid),
987                                                 lnet_acceptor_port());
988                 if (rc != 0) {
989                         CERROR("Can't add peer %s: %d\n",
990                                libcfs_nid2str(nid), rc);
991                         tx->tx_status = rc;
992                         kibnal_tx_done(tx);
993                         return;
994                 }
995         }
996
997         conn = kibnal_find_conn_locked (peer);
998         if (conn != NULL) {
999                 /* Connection exists; queue message on it */
1000                 kibnal_conn_addref(conn);       /* +1 ref from me... */
1001                 write_unlock_irqrestore (g_lock, flags);
1002                 
1003                 kibnal_queue_tx (tx, conn);
1004                 kibnal_conn_decref(conn);       /* ...until here */
1005                 return;
1006         }
1007
1008         if (peer->ibp_connecting == 0 &&
1009             peer->ibp_accepting == 0) {
1010                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1011                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1012                         write_unlock_irqrestore (g_lock, flags);
1013                         tx->tx_status = -EHOSTUNREACH;
1014                         kibnal_tx_done (tx);
1015                         return;
1016                 }
1017         
1018                 kibnal_schedule_active_connect_locked(peer);
1019         }
1020         
1021         /* A connection is being established; queue the message... */
1022         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1023
1024         write_unlock_irqrestore (g_lock, flags);
1025 }
1026
1027 void
1028 kibnal_txlist_done (struct list_head *txlist, int status)
1029 {
1030         kib_tx_t *tx;
1031
1032         while (!list_empty(txlist)) {
1033                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1034
1035                 list_del (&tx->tx_list);
1036                 /* complete now */
1037                 tx->tx_status = status;
1038                 kibnal_tx_done (tx);
1039         }
1040 }
1041
1042 int
1043 kibnal_start_passive_rdma (int type, lnet_msg_t *lntmsg,
1044                            int niov, struct iovec *iov, lnet_kiov_t *kiov,
1045                            int nob)
1046 {
1047         lnet_nid_t  nid = lntmsg->msg_target.nid;
1048         kib_tx_t   *tx;
1049         kib_msg_t  *ibmsg;
1050         int         rc;
1051         int         access;
1052         
1053         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1054                  type == IBNAL_MSG_GET_RDMA);
1055         LASSERT (nob > 0);
1056         LASSERT (!in_interrupt());              /* Mapping could block */
1057
1058         if (type == IBNAL_MSG_PUT_RDMA) {
1059                 access = IB_ACCESS_REMOTE_READ;
1060         } else {
1061                 access = IB_ACCESS_REMOTE_WRITE |
1062                          IB_ACCESS_LOCAL_WRITE;
1063         }
1064
1065         tx = kibnal_get_idle_tx ();
1066         if (tx == NULL) {
1067                 CERROR("Can't allocate %s txd for %s\n",
1068                        (type == IBNAL_MSG_PUT_RDMA) ? "PUT/REPLY" : "GET",
1069                        libcfs_nid2str(nid));
1070                 return -ENOMEM;
1071         }
1072
1073         
1074         if (iov != NULL) 
1075                 rc = kibnal_map_iov (tx, access, niov, iov, 0, nob);
1076         else
1077                 rc = kibnal_map_kiov (tx, access, niov, kiov, 0, nob);
1078
1079         if (rc != 0) {
1080                 CERROR ("Can't map RDMA for %s: %d\n", 
1081                         libcfs_nid2str(nid), rc);
1082                 goto failed;
1083         }
1084         
1085         if (type == IBNAL_MSG_GET_RDMA) {
1086                 /* reply gets finalized when tx completes */
1087                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni, 
1088                                                          lntmsg);
1089                 if (tx->tx_lntmsg[1] == NULL) {
1090                         CERROR ("Can't create reply for GET -> %s\n",
1091                                 libcfs_nid2str(nid));
1092                         rc = -ENOMEM;
1093                         goto failed;
1094                 }
1095         }
1096         
1097         tx->tx_passive_rdma = 1;
1098
1099         ibmsg = tx->tx_msg;
1100
1101         ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr;
1102         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1103         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1104         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1105         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1106
1107         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1108
1109         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1110                LPX64", nob %d\n",
1111                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1112                tx->tx_md.md_addr, nob);
1113         
1114         /* lntmsg gets finalized when tx completes. */
1115         tx->tx_lntmsg[0] = lntmsg;
1116
1117         kibnal_launch_tx(tx, nid);
1118         return (0);
1119
1120  failed:
1121         tx->tx_status = rc;
1122         kibnal_tx_done (tx);
1123         return (-EIO);
1124 }
1125
1126 void
1127 kibnal_start_active_rdma (int type, int status,
1128                           kib_rx_t *rx, lnet_msg_t *lntmsg, 
1129                           unsigned int niov,
1130                           struct iovec *iov, lnet_kiov_t *kiov,
1131                           int offset, int nob)
1132 {
1133         kib_msg_t    *rxmsg = rx->rx_msg;
1134         kib_msg_t    *txmsg;
1135         kib_tx_t     *tx;
1136         int           access;
1137         int           rdma_op;
1138         int           rc;
1139
1140         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1141                type, status, niov, offset, nob);
1142
1143         /* Called by scheduler */
1144         LASSERT (!in_interrupt ());
1145
1146         /* Either all pages or all vaddrs */
1147         LASSERT (!(kiov != NULL && iov != NULL));
1148
1149         /* No data if we're completing with failure */
1150         LASSERT (status == 0 || nob == 0);
1151
1152         LASSERT (type == IBNAL_MSG_GET_DONE ||
1153                  type == IBNAL_MSG_PUT_DONE);
1154
1155         if (type == IBNAL_MSG_GET_DONE) {
1156                 access   = 0;
1157                 rdma_op  = IB_OP_RDMA_WRITE;
1158                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1159         } else {
1160                 access   = IB_ACCESS_LOCAL_WRITE;
1161                 rdma_op  = IB_OP_RDMA_READ;
1162                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1163         }
1164
1165         tx = kibnal_get_idle_tx ();
1166         if (tx == NULL) {
1167                 CERROR ("tx descs exhausted on RDMA from %s"
1168                         " completing locally with failure\n",
1169                         libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid));
1170                 lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM);
1171                 return;
1172         }
1173         LASSERT (tx->tx_nsp == 0);
1174                         
1175         if (nob != 0) {
1176                 /* We actually need to transfer some data (the transfer
1177                  * size could get truncated to zero when the incoming
1178                  * message is matched) */
1179
1180                 if (kiov != NULL)
1181                         rc = kibnal_map_kiov (tx, access,
1182                                               niov, kiov, offset, nob);
1183                 else
1184                         rc = kibnal_map_iov (tx, access,
1185                                              niov, iov, offset, nob);
1186                 
1187                 if (rc != 0) {
1188                         CERROR ("Can't map RDMA -> %s: %d\n", 
1189                                 libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid), 
1190                                 rc);
1191                         /* We'll skip the RDMA and complete with failure. */
1192                         status = rc;
1193                         nob = 0;
1194                 } else {
1195                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1196                                 .address = tx->tx_md.md_addr,
1197                                 .length  = nob,
1198                                 .key     = tx->tx_md.md_lkey,
1199                         };
1200                 
1201                         tx->tx_sp[0] = (struct ib_send_param) {
1202                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1203                                 .op                   = rdma_op,
1204                                 .gather_list          = &tx->tx_gl[0],
1205                                 .num_gather_entries   = 1,
1206                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1207                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1208                                 .device_specific      = NULL,
1209                                 .solicited_event      = 0,
1210                                 .signaled             = 1,
1211                                 .immediate_data_valid = 0,
1212                                 .fence                = 0,
1213                                 .inline_data          = 0,
1214                         };
1215
1216                         tx->tx_nsp = 1;
1217                 }
1218         }
1219
1220         txmsg = tx->tx_msg;
1221
1222         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1223         txmsg->ibm_u.completion.ibcm_status = status;
1224         
1225         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1226
1227         if (status == 0 && nob != 0) {
1228                 LASSERT (tx->tx_nsp > 1);
1229                 /* RDMA: lntmsg gets finalized when the tx completes.  This
1230                  * is after the completion message has been sent, which in
1231                  * turn is after the RDMA has finished. */
1232                 tx->tx_lntmsg[0] = lntmsg;
1233         } else {
1234                 LASSERT (tx->tx_nsp == 1);
1235                 /* No RDMA: local completion happens now! */
1236                 CDEBUG(D_NET, "No data: immediate completion\n");
1237                 lnet_finalize (kibnal_data.kib_ni, lntmsg,
1238                               status == 0 ? 0 : -EIO);
1239         }
1240
1241         kibnal_queue_tx(tx, rx->rx_conn);
1242 }
1243
1244 int
1245 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1246 {
1247         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1248         int               type = lntmsg->msg_type; 
1249         lnet_process_id_t target = lntmsg->msg_target;
1250         int               target_is_router = lntmsg->msg_target_is_router;
1251         int               routing = lntmsg->msg_routing;
1252         unsigned int      payload_niov = lntmsg->msg_niov; 
1253         struct iovec     *payload_iov = lntmsg->msg_iov; 
1254         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1255         unsigned int      payload_offset = lntmsg->msg_offset;
1256         unsigned int      payload_nob = lntmsg->msg_len;
1257         kib_msg_t        *ibmsg;
1258         kib_tx_t         *tx;
1259         int               nob;
1260
1261         /* NB 'private' is different depending on what we're sending.... */
1262
1263         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1264                payload_nob, payload_niov, libcfs_id2str(target));
1265
1266         LASSERT (payload_nob == 0 || payload_niov > 0);
1267         LASSERT (payload_niov <= LNET_MAX_IOV);
1268
1269         /* Thread context if we're sending payload */
1270         LASSERT (!in_interrupt() || payload_niov == 0);
1271         /* payload is either all vaddrs or all pages */
1272         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1273
1274         switch (type) {
1275         default:
1276                 LBUG();
1277                 return (-EIO);
1278                 
1279         case LNET_MSG_ACK:
1280                 LASSERT (payload_nob == 0);
1281                 break;
1282
1283         case LNET_MSG_GET:
1284                 if (routing || target_is_router)
1285                         break;                  /* send IMMEDIATE */
1286
1287                 /* is the REPLY message too small for RDMA? */
1288                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1289                 if (nob <= IBNAL_MSG_SIZE)
1290                         break;                  /* send IMMEDIATE */
1291
1292                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1293                         return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1294                                                          lntmsg->msg_md->md_niov, 
1295                                                          lntmsg->msg_md->md_iov.iov, NULL,
1296                                                          lntmsg->msg_md->md_length);
1297
1298                 return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1299                                                  lntmsg->msg_md->md_niov, 
1300                                                  NULL, lntmsg->msg_md->md_iov.kiov,
1301                                                  lntmsg->msg_md->md_length);
1302
1303         case LNET_MSG_REPLY:
1304         case LNET_MSG_PUT:
1305                 /* Is the payload small enough not to need RDMA? */
1306                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1307                 if (nob <= IBNAL_MSG_SIZE)
1308                         break;                  /* send IMMEDIATE */
1309                 
1310                 return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg,
1311                                                  payload_niov,
1312                                                  payload_iov, payload_kiov,
1313                                                  payload_nob);
1314         }
1315
1316         /* Send IMMEDIATE */
1317
1318         tx = kibnal_get_idle_tx();
1319         if (tx == NULL) {
1320                 CERROR ("Can't send %d to %s: tx descs exhausted%s\n", 
1321                         type, libcfs_nid2str(target.nid), 
1322                         in_interrupt() ? " (intr)" : "");
1323                 return (-ENOMEM);
1324         }
1325
1326         ibmsg = tx->tx_msg;
1327         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1328
1329         if (payload_kiov != NULL)
1330                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1331                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1332                                     payload_niov, payload_kiov, 
1333                                     payload_offset, payload_nob);
1334         else
1335                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1336                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1337                                    payload_niov, payload_iov, 
1338                                    payload_offset, payload_nob);
1339
1340         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1341                             offsetof(kib_immediate_msg_t, 
1342                                      ibim_payload[payload_nob]));
1343
1344         /* lntmsg gets finalized when tx completes */
1345         tx->tx_lntmsg[0] = lntmsg;
1346
1347         kibnal_launch_tx(tx, target.nid);
1348         return (0);
1349 }
1350
1351 int
1352 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1353                    void **new_private)
1354 {
1355         kib_rx_t    *rx = private;
1356         kib_conn_t  *conn = rx->rx_conn;
1357
1358         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1359                 /* Can't block if RDMA completions need normal credits */
1360                 LCONSOLE_ERROR_MSG(0x12a, 
1361                                "Dropping message from %s: no buffers free. "
1362                                "%s is running an old version of LNET that may "
1363                                "deadlock if messages wait for buffers)\n",
1364                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1365                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1366                 return -EDEADLK;
1367         }
1368         
1369         *new_private = private;
1370         return 0;
1371 }
1372
1373 int
1374 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1375              int delayed, unsigned int niov,
1376              struct iovec *iov, lnet_kiov_t *kiov,
1377              unsigned int offset, unsigned int mlen, unsigned int rlen)
1378 {
1379         kib_rx_t    *rx = private;
1380         kib_msg_t   *rxmsg = rx->rx_msg;
1381         int          msg_nob;
1382         int          rc = 0;
1383         
1384         LASSERT (mlen <= rlen);
1385         LASSERT (!in_interrupt ());
1386         /* Either all pages or all vaddrs */
1387         LASSERT (!(kiov != NULL && iov != NULL));
1388
1389         switch (rxmsg->ibm_type) {
1390         default:
1391                 LBUG();
1392
1393         case IBNAL_MSG_IMMEDIATE:
1394                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1395                 if (msg_nob > rx->rx_nob) {
1396                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1397                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1398                                 msg_nob, rx->rx_nob);
1399                         rc = -EPROTO;
1400                         break;
1401                 }
1402
1403                 if (kiov != NULL)
1404                         lnet_copy_flat2kiov(
1405                                 niov, kiov, offset, 
1406                                 IBNAL_MSG_SIZE, rxmsg,
1407                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1408                                 mlen);
1409                 else
1410                         lnet_copy_flat2iov(
1411                                 niov, iov, offset,
1412                                 IBNAL_MSG_SIZE, rxmsg,
1413                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1414                                 mlen);
1415
1416                 lnet_finalize (ni, lntmsg, 0);
1417                 break;
1418
1419         case IBNAL_MSG_GET_RDMA:
1420                 if (lntmsg != NULL) {
1421                         /* GET matched: RDMA lntmsg's payload */
1422                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1423                                                  rx, lntmsg, 
1424                                                  lntmsg->msg_niov, 
1425                                                  lntmsg->msg_iov, 
1426                                                  lntmsg->msg_kiov,
1427                                                  lntmsg->msg_offset, 
1428                                                  lntmsg->msg_len);
1429                 } else {
1430                         /* GET didn't match anything */
1431                         kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
1432                                                   rx, NULL, 0, NULL, NULL, 0, 0);
1433                 }
1434                 break;
1435
1436         case IBNAL_MSG_PUT_RDMA:
1437                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg,
1438                                           niov, iov, kiov, offset, mlen);
1439                 break;
1440         }
1441
1442         kibnal_post_rx(rx, 1, 0);
1443         return rc;
1444 }
1445
1446 int
1447 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1448 {
1449         long    pid = kernel_thread (fn, arg, 0);
1450
1451         if (pid < 0)
1452                 return ((int)pid);
1453
1454         atomic_inc (&kibnal_data.kib_nthreads);
1455         return (0);
1456 }
1457
1458 void
1459 kibnal_thread_fini (void)
1460 {
1461         atomic_dec (&kibnal_data.kib_nthreads);
1462 }
1463
1464 void
1465 kibnal_peer_alive (kib_peer_t *peer)
1466 {
1467         /* This is racy, but everyone's only writing cfs_time_current() */
1468         peer->ibp_last_alive = cfs_time_current();
1469         mb();
1470 }
1471
1472 void
1473 kibnal_peer_notify (kib_peer_t *peer)
1474 {
1475         time_t        last_alive = 0;
1476         int           error = 0;
1477         unsigned long flags;
1478         
1479         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1480
1481         if (list_empty(&peer->ibp_conns) &&
1482             peer->ibp_accepting == 0 &&
1483             peer->ibp_connecting == 0 &&
1484             peer->ibp_error != 0) {
1485                 error = peer->ibp_error;
1486                 peer->ibp_error = 0;
1487                 last_alive = cfs_time_current_sec() -
1488                              cfs_duration_sec(cfs_time_current() -
1489                                               peer->ibp_last_alive);
1490         }
1491         
1492         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1493         
1494         if (error != 0)
1495                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1496 }
1497
1498 void
1499 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1500 {
1501         /* This just does the immmediate housekeeping, and schedules the
1502          * connection for the reaper to finish off.
1503          * Caller holds kib_global_lock exclusively in irq context */
1504         kib_peer_t   *peer = conn->ibc_peer;
1505
1506         CDEBUG (error == 0 ? D_NET : D_NETERROR,
1507                 "closing conn to %s: error %d\n", 
1508                 libcfs_nid2str(peer->ibp_nid), error);
1509         
1510         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1511                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1512
1513         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1514                 /* kib_reaper_conns takes ibc_list's ref */
1515                 list_del (&conn->ibc_list);
1516         } else {
1517                 /* new ref for kib_reaper_conns */
1518                 kibnal_conn_addref(conn);
1519         }
1520         
1521         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1522                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1523                     kibnal_peer_active(peer))     /* still in peer table */
1524                         kibnal_unlink_peer_locked (peer);
1525
1526                 peer->ibp_error = error; /* set/clear error on last conn */
1527         }
1528
1529         conn->ibc_state = IBNAL_CONN_DEATHROW;
1530
1531         /* Schedule conn for closing/destruction */
1532         spin_lock (&kibnal_data.kib_reaper_lock);
1533
1534         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1535         wake_up (&kibnal_data.kib_reaper_waitq);
1536                 
1537         spin_unlock (&kibnal_data.kib_reaper_lock);
1538 }
1539
1540 int
1541 kibnal_close_conn (kib_conn_t *conn, int why)
1542 {
1543         unsigned long     flags;
1544         int               count = 0;
1545
1546         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1547
1548         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1549         
1550         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1551                 count = 1;
1552                 kibnal_close_conn_locked (conn, why);
1553         }
1554         
1555         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1556         return (count);
1557 }
1558
1559 void
1560 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
1561 {
1562         LIST_HEAD        (zombies);
1563         unsigned long     flags;
1564
1565         LASSERT(error != 0);
1566
1567         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1568
1569         if (active) {
1570                 LASSERT (peer->ibp_connecting != 0);
1571                 peer->ibp_connecting--;
1572         } else {
1573                 LASSERT (peer->ibp_accepting != 0);
1574                 peer->ibp_accepting--;
1575         }
1576
1577         if (peer->ibp_connecting != 0 ||
1578             peer->ibp_accepting != 0) {
1579                 /* another connection attempt under way... */
1580                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1581                 return;
1582         }
1583
1584         if (list_empty(&peer->ibp_conns)) {
1585                 /* Say when active connection can be re-attempted */
1586                 peer->ibp_reconnect_interval *= 2;
1587                 peer->ibp_reconnect_interval =
1588                         MAX(peer->ibp_reconnect_interval,
1589                             *kibnal_tunables.kib_min_reconnect_interval);
1590                 peer->ibp_reconnect_interval =
1591                         MIN(peer->ibp_reconnect_interval,
1592                             *kibnal_tunables.kib_max_reconnect_interval);
1593                 
1594                 peer->ibp_reconnect_time = jiffies + 
1595                                            peer->ibp_reconnect_interval * HZ;
1596         
1597                 /* Take peer's blocked transmits; I'll complete
1598                  * them with error */
1599                 list_add(&zombies, &peer->ibp_tx_queue);
1600                 list_del_init(&peer->ibp_tx_queue);
1601                 
1602                 if (kibnal_peer_active(peer) &&
1603                     (peer->ibp_persistence == 0)) {
1604                         /* failed connection attempt on non-persistent peer */
1605                         kibnal_unlink_peer_locked (peer);
1606                 }
1607
1608                 peer->ibp_error = error;
1609         } else {
1610                 /* Can't have blocked transmits if there are connections */
1611                 LASSERT (list_empty(&peer->ibp_tx_queue));
1612         }
1613         
1614         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1615
1616         kibnal_peer_notify(peer);
1617         
1618         if (!list_empty (&zombies))
1619                 CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
1620                         libcfs_nid2str(peer->ibp_nid));
1621
1622         kibnal_txlist_done(&zombies, -EHOSTUNREACH);
1623 }
1624
1625 void
1626 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1627 {
1628         int               state = conn->ibc_state;
1629         kib_peer_t       *peer = conn->ibc_peer;
1630         kib_tx_t         *tx;
1631         unsigned long     flags;
1632         int               rc;
1633         int               i;
1634
1635         if (conn->ibc_connreq != NULL) {
1636                 LIBCFS_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1637                 conn->ibc_connreq = NULL;
1638         }
1639
1640         switch (state) {
1641         case IBNAL_CONN_CONNECTING:
1642                 /* conn has a CM comm_id */
1643                 if (status == 0) {
1644                         /* Install common (active/passive) callback for
1645                          * disconnect/idle notification */
1646                         rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1647                                                   kibnal_conn_callback,
1648                                                   conn);
1649                         LASSERT (rc == 0);
1650                 } else {
1651                         /* LASSERT (no more CM callbacks) */
1652                         rc = tsIbCmCallbackModify(conn->ibc_comm_id,
1653                                                   kibnal_bad_conn_callback,
1654                                                   conn);
1655                         LASSERT (rc == 0);
1656                 }
1657                 break;
1658                 
1659         case IBNAL_CONN_INIT_QP:
1660                 LASSERT (status != 0);
1661                 break;
1662                 
1663         default:
1664                 LBUG();
1665         }
1666         
1667         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1668
1669         if (active)
1670                 LASSERT (peer->ibp_connecting != 0);
1671         else
1672                 LASSERT (peer->ibp_accepting != 0);
1673         
1674         if (status == 0 &&                      /* connection established */
1675             kibnal_peer_active(peer)) {         /* peer not deleted */
1676
1677                 if (active)
1678                         peer->ibp_connecting--;
1679                 else
1680                         peer->ibp_accepting--;
1681
1682                 conn->ibc_last_send = jiffies;
1683                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1684                 kibnal_peer_alive(peer);
1685
1686                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1687                  * the IB_CM_IDLE callback */
1688                 kibnal_conn_addref(conn);
1689                 list_add (&conn->ibc_list, &peer->ibp_conns);
1690
1691                 peer->ibp_reconnect_interval = 0; /* OK to reconnect at any time */
1692
1693                 /* post blocked sends to the new connection */
1694                 spin_lock (&conn->ibc_lock);
1695                 
1696                 while (!list_empty (&peer->ibp_tx_queue)) {
1697                         tx = list_entry (peer->ibp_tx_queue.next, 
1698                                          kib_tx_t, tx_list);
1699                         
1700                         list_del (&tx->tx_list);
1701
1702                         kibnal_queue_tx_locked (tx, conn);
1703                 }
1704                 
1705                 spin_unlock (&conn->ibc_lock);
1706
1707                 /* Nuke any dangling conns from a different peer instance... */
1708                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1709                                                  conn->ibc_incarnation);
1710
1711                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1712
1713                 /* queue up all the receives */
1714                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1715                         /* +1 ref for rx desc */
1716                         kibnal_conn_addref(conn);
1717
1718                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1719                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1720                                conn->ibc_rxs[i].rx_vaddr);
1721
1722                         kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
1723                 }
1724
1725                 kibnal_check_sends (conn);
1726                 return;
1727         }
1728
1729         if (status == 0) {
1730                 /* connection established, but peer was deleted.  Schedule for
1731                  * reaper to cm_disconnect... */
1732                 status = -ECONNABORTED;
1733                 kibnal_close_conn_locked (conn, status);
1734         } else {
1735                 /* just waiting for refs to drain */
1736                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1737         } 
1738
1739         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1740
1741         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1742 }
1743
1744 int
1745 kibnal_accept_connreq (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1746                        kib_msg_t *msg, int nob)
1747 {
1748         kib_conn_t    *conn;
1749         kib_peer_t    *peer;
1750         kib_peer_t    *peer2;
1751         unsigned long  flags;
1752         int            rc;
1753
1754         rc = kibnal_unpack_msg(msg, 0, nob);
1755         if (rc != 0) {
1756                 CERROR("Can't unpack connreq msg: %d\n", rc);
1757                 return -EPROTO;
1758         }
1759
1760         CDEBUG(D_NET, "connreq from %s\n", libcfs_nid2str(msg->ibm_srcnid));
1761
1762         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1763                 CERROR("Unexpected connreq msg type: %x from %s\n",
1764                        msg->ibm_type, libcfs_nid2str(msg->ibm_srcnid));
1765                 return -EPROTO;
1766         }
1767                 
1768         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1769                 CERROR("Can't accept %s: bad queue depth %d (%d expected)\n",
1770                        libcfs_nid2str(msg->ibm_srcnid), 
1771                        msg->ibm_u.connparams.ibcp_queue_depth, 
1772                        IBNAL_MSG_QUEUE_SIZE);
1773                 return (-EPROTO);
1774         }
1775         
1776         conn = kibnal_create_conn();
1777         if (conn == NULL)
1778                 return (-ENOMEM);
1779
1780         /* assume 'nid' is a new peer */
1781         rc = kibnal_create_peer(&peer, msg->ibm_srcnid);
1782         if (rc != 0) {
1783                 kibnal_conn_decref(conn);
1784                 return (-ENOMEM);
1785         }
1786         
1787         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1788
1789         /* Check I'm the same instance that gave the connection parameters.  
1790          * NB If my incarnation changes after this, the peer will get nuked and
1791          * we'll spot that when the connection is finally added into the peer's
1792          * connlist */
1793         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
1794                                      msg->ibm_dstnid) ||
1795             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1796                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1797                 
1798                 CERROR("Stale connection params from %s\n",
1799                        libcfs_nid2str(msg->ibm_srcnid));
1800                 kibnal_conn_decref(conn);
1801                 kibnal_peer_decref(peer);
1802                 return -ESTALE;
1803         }
1804
1805         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1806         if (peer2 == NULL) {
1807                 /* Brand new peer */
1808                 LASSERT (peer->ibp_accepting == 0);
1809
1810                 /* peer table takes my ref on peer */
1811                 list_add_tail (&peer->ibp_list,
1812                                kibnal_nid2peerlist(msg->ibm_srcnid));
1813         } else {
1814                 /* tie-break connection race in favour of the higher NID */                
1815                 if (peer2->ibp_connecting != 0 &&
1816                     msg->ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
1817                         write_unlock_irqrestore(&kibnal_data.kib_global_lock,
1818                                                 flags);
1819                         CWARN("Conn race %s\n",
1820                               libcfs_nid2str(peer2->ibp_nid));
1821
1822                         kibnal_conn_decref(conn);
1823                         kibnal_peer_decref(peer);
1824                         return -EALREADY;
1825                 }
1826
1827                 kibnal_peer_decref(peer);
1828                 peer = peer2;
1829         }
1830
1831         /* +1 ref for conn */
1832         kibnal_peer_addref(peer);
1833         peer->ibp_accepting++;
1834
1835         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1836
1837         conn->ibc_peer = peer;
1838         conn->ibc_state = IBNAL_CONN_CONNECTING;
1839         conn->ibc_comm_id = cid;
1840         conn->ibc_incarnation = msg->ibm_srcstamp;
1841         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1842         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
1843         conn->ibc_version = msg->ibm_version;
1844
1845         *connp = conn;
1846         return (0);
1847 }
1848
1849 tTS_IB_CM_CALLBACK_RETURN
1850 kibnal_bad_conn_callback (tTS_IB_CM_EVENT event,
1851                           tTS_IB_CM_COMM_ID cid,
1852                           void *param,
1853                           void *arg)
1854 {
1855         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1856         LBUG ();
1857         return TS_IB_CM_CALLBACK_PROCEED;
1858 }
1859
1860 void
1861 kibnal_abort_txs (kib_conn_t *conn, struct list_head *txs)
1862 {
1863         LIST_HEAD        (zombies); 
1864         struct list_head *tmp;
1865         struct list_head *nxt;
1866         kib_tx_t         *tx;
1867         unsigned long     flags;
1868
1869         spin_lock_irqsave (&conn->ibc_lock, flags);
1870
1871         list_for_each_safe (tmp, nxt, txs) {
1872                 tx = list_entry (tmp, kib_tx_t, tx_list);
1873
1874                 if (txs == &conn->ibc_active_txs) {
1875                         LASSERT (tx->tx_passive_rdma ||
1876                                  !tx->tx_passive_rdma_wait);
1877
1878                         LASSERT (tx->tx_passive_rdma_wait ||
1879                                  tx->tx_sending != 0);
1880                 } else {
1881                         LASSERT (!tx->tx_passive_rdma_wait);
1882                         LASSERT (tx->tx_sending == 0);
1883                 }
1884
1885                 tx->tx_status = -ECONNABORTED;
1886                 tx->tx_passive_rdma_wait = 0;
1887
1888                 if (tx->tx_sending == 0) {
1889                         list_del (&tx->tx_list);
1890                         list_add (&tx->tx_list, &zombies);
1891                 }
1892         }
1893         
1894         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1895
1896         kibnal_txlist_done (&zombies, -ECONNABORTED);
1897 }
1898
1899 tTS_IB_CM_CALLBACK_RETURN
1900 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1901                       tTS_IB_CM_COMM_ID cid,
1902                       void *param,
1903                       void *arg)
1904 {
1905         kib_conn_t       *conn = arg;
1906         int               rc;
1907
1908         /* Established Connection Notifier */
1909
1910         switch (event) {
1911         default:
1912                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
1913                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1914                 kibnal_close_conn (conn, -ECONNABORTED);
1915                 break;
1916                 
1917         case TS_IB_CM_DISCONNECTED:
1918                 CDEBUG(D_NETERROR, "Connection %p -> %s DISCONNECTED.\n",
1919                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1920                 kibnal_close_conn (conn, 0);
1921                 break;
1922
1923         case TS_IB_CM_IDLE:
1924                 CDEBUG(D_NET, "Connection %p -> %s IDLE.\n",
1925                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1926
1927                 /* LASSERT (no further callbacks) */
1928                 rc = tsIbCmCallbackModify(cid, kibnal_bad_conn_callback, conn);
1929                 LASSERT (rc == 0);
1930
1931                 /* NB we wait until the connection has closed before
1932                  * completing outstanding passive RDMAs so we can be sure
1933                  * the network can't touch the mapped memory any more. */
1934
1935                 kibnal_abort_txs(conn, &conn->ibc_tx_queue);
1936                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
1937                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
1938                 kibnal_abort_txs(conn, &conn->ibc_active_txs);
1939                 
1940                 kibnal_conn_decref(conn);        /* Lose CM's ref */
1941                 break;
1942         }
1943
1944         return TS_IB_CM_CALLBACK_PROCEED;
1945 }
1946
1947 tTS_IB_CM_CALLBACK_RETURN
1948 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1949                               tTS_IB_CM_COMM_ID cid,
1950                               void *param,
1951                               void *arg)
1952 {
1953         kib_conn_t  *conn = arg;
1954         int          rc;
1955         
1956         switch (event) {
1957         default:
1958                 if (conn == NULL) {
1959                         /* no connection yet */
1960                         CERROR ("Unexpected event: %d\n", event);
1961                         return TS_IB_CM_CALLBACK_ABORT;
1962                 }
1963                 
1964                 CERROR ("%s event %p -> %s: %d\n",
1965                         (event == TS_IB_CM_IDLE) ? "IDLE" : "Unexpected",
1966                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1967                 kibnal_connreq_done(conn, 0, -ECONNABORTED);
1968                 kibnal_conn_decref(conn); /* drop CM's ref */
1969                 return TS_IB_CM_CALLBACK_ABORT;
1970                 
1971         case TS_IB_CM_REQ_RECEIVED: {
1972                 struct ib_cm_req_received_param *req = param;
1973                 kib_msg_t                       *msg = req->remote_private_data;
1974
1975                 LASSERT (conn == NULL);
1976
1977                 /* Don't really know srcnid until successful unpack */
1978                 CDEBUG(D_NET, "REQ from ?%s?\n", libcfs_nid2str(msg->ibm_srcnid));
1979
1980                 rc = kibnal_accept_connreq(&conn, cid, msg, 
1981                                            req->remote_private_data_len);
1982                 if (rc != 0) {
1983                         CERROR ("Can't accept ?%s?: %d\n",
1984                                 libcfs_nid2str(msg->ibm_srcnid), rc);
1985                         return TS_IB_CM_CALLBACK_ABORT;
1986                 }
1987
1988                 /* update 'arg' for next callback */
1989                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
1990                 LASSERT (rc == 0);
1991
1992                 msg = req->accept_param.reply_private_data;
1993                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
1994                                 sizeof(msg->ibm_u.connparams));
1995
1996                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
1997
1998                 kibnal_pack_msg(msg, conn->ibc_version, 0, 
1999                                 conn->ibc_peer->ibp_nid, 
2000                                 conn->ibc_incarnation);
2001
2002                 req->accept_param.qp                     = conn->ibc_qp;
2003                 req->accept_param.reply_private_data_len = msg->ibm_nob;
2004                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
2005                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
2006                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
2007                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
2008
2009                 CDEBUG(D_NET, "Proceeding\n");
2010                 return TS_IB_CM_CALLBACK_PROCEED; /* CM takes my ref on conn */
2011         }
2012
2013         case TS_IB_CM_ESTABLISHED:
2014                 LASSERT (conn != NULL);
2015                 CWARN("Connection %p -> %s ESTABLISHED.\n",
2016                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2017
2018                 kibnal_connreq_done(conn, 0, 0);
2019                 return TS_IB_CM_CALLBACK_PROCEED;
2020         }
2021 }
2022
2023 tTS_IB_CM_CALLBACK_RETURN
2024 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
2025                              tTS_IB_CM_COMM_ID cid,
2026                              void *param,
2027                              void *arg)
2028 {
2029         kib_conn_t    *conn = arg;
2030         unsigned long  flags;
2031
2032         switch (event) {
2033         case TS_IB_CM_REP_RECEIVED: {
2034                 struct ib_cm_rep_received_param *rep = param;
2035                 kib_msg_t                       *msg = rep->remote_private_data;
2036                 int                              nob = rep->remote_private_data_len;
2037                 int                              rc;
2038
2039                 rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
2040                 if (rc != 0) {
2041                         CERROR ("Error %d unpacking conn ack from %s\n",
2042                                 rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2043                         kibnal_connreq_done(conn, 1, rc);
2044                         kibnal_conn_decref(conn); /* drop CM's ref */
2045                         return TS_IB_CM_CALLBACK_ABORT;
2046                 }
2047
2048                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2049                         CERROR ("Unexpected conn ack type %d from %s\n",
2050                                 msg->ibm_type, 
2051                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2052                         kibnal_connreq_done(conn, 1, -EPROTO);
2053                         kibnal_conn_decref(conn); /* drop CM's ref */
2054                         return TS_IB_CM_CALLBACK_ABORT;
2055                 }
2056
2057                 if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
2058                                              msg->ibm_srcnid) ||
2059                     !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
2060                                              msg->ibm_dstnid) ||
2061                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2062                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2063                         CERROR("Stale conn ack from %s\n",
2064                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2065                         kibnal_connreq_done(conn, 1, -ESTALE);
2066                         kibnal_conn_decref(conn); /* drop CM's ref */
2067                         return TS_IB_CM_CALLBACK_ABORT;
2068                 }
2069
2070                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2071                         CERROR ("Bad queue depth %d from %s\n",
2072                                 msg->ibm_u.connparams.ibcp_queue_depth,
2073                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2074                         kibnal_connreq_done(conn, 1, -EPROTO);
2075                         kibnal_conn_decref(conn); /* drop CM's ref */
2076                         return TS_IB_CM_CALLBACK_ABORT;
2077                 }
2078                                 
2079                 CDEBUG(D_NET, "Connection %p -> %s REP_RECEIVED.\n",
2080                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2081
2082                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2083                 conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2084                 return TS_IB_CM_CALLBACK_PROCEED;
2085         }
2086
2087         case TS_IB_CM_ESTABLISHED:
2088                 CWARN("Connection %p -> %s ESTABLISHED\n",
2089                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2090
2091                 kibnal_connreq_done(conn, 1, 0);
2092                 return TS_IB_CM_CALLBACK_PROCEED;
2093
2094         case TS_IB_CM_IDLE:
2095                 CDEBUG(D_NETERROR, "Connection %p -> %s IDLE\n",
2096                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2097                 /* I assume this connection attempt was rejected because the
2098                  * peer found a stale QP; I'll just try again */
2099                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2100                 kibnal_schedule_active_connect_locked(conn->ibc_peer);
2101                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2102
2103                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2104                 kibnal_conn_decref(conn); /* drop CM's ref */
2105                 return TS_IB_CM_CALLBACK_ABORT;
2106
2107         default:
2108                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
2109                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
2110                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2111                 kibnal_conn_decref(conn); /* drop CM's ref */
2112                 return TS_IB_CM_CALLBACK_ABORT;
2113         }
2114 }
2115
2116 int
2117 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2118                           struct ib_path_record *resp, int remaining,
2119                           void *arg)
2120 {
2121         kib_conn_t *conn = arg;
2122         kib_peer_t *peer = conn->ibc_peer;
2123         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2124
2125         if (status != 0) {
2126                 CDEBUG (D_NETERROR, "Pathreq %p -> %s failed: %d\n",
2127                         conn, libcfs_nid2str(peer->ibp_nid), status);
2128                 kibnal_connreq_done(conn, 1, status);
2129                 kibnal_conn_decref(conn); /* drop callback's ref */
2130                 return 1;    /* non-zero prevents further callbacks */
2131         }
2132
2133         conn->ibc_connreq->cr_path = *resp;
2134
2135         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2136         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2137         kibnal_pack_msg(msg, conn->ibc_version, 0, 
2138                         peer->ibp_nid, conn->ibc_incarnation);
2139
2140         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2141                 .qp                   = conn->ibc_qp,
2142                 .req_private_data     = msg,
2143                 .req_private_data_len = msg->ibm_nob,
2144                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2145                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2146                 .retry_count          = IBNAL_RETRY,
2147                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2148                 .cm_response_timeout  = *kibnal_tunables.kib_timeout,
2149                 .max_cm_retries       = IBNAL_CM_RETRY,
2150                 .flow_control         = IBNAL_FLOW_CONTROL,
2151         };
2152
2153         /* XXX set timeout just like SDP!!!*/
2154         conn->ibc_connreq->cr_path.packet_life = 13;
2155         
2156         /* Flag I'm getting involved with the CM... */
2157         conn->ibc_state = IBNAL_CONN_CONNECTING;
2158
2159         CDEBUG(D_NET, "Connecting to, service id "LPX64", on %s\n",
2160                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 
2161                libcfs_nid2str(peer->ibp_nid));
2162
2163         /* kibnal_connect_callback gets my conn ref */
2164         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2165                                 &conn->ibc_connreq->cr_path, NULL,
2166                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2167                                 kibnal_active_conn_callback, conn,
2168                                 &conn->ibc_comm_id);
2169         if (status != 0) {
2170                 CERROR ("Connect %p -> %s failed: %d\n",
2171                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
2172                 /* Back out state change: I've not got a CM comm_id yet... */
2173                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2174                 kibnal_connreq_done(conn, 1, status);
2175                 kibnal_conn_decref(conn); /* Drop callback's ref */
2176         }
2177         
2178         return 1;    /* non-zero to prevent further callbacks */
2179 }
2180
2181 void
2182 kibnal_connect_peer (kib_peer_t *peer)
2183 {
2184         kib_conn_t  *conn;
2185         int          rc;
2186
2187         conn = kibnal_create_conn();
2188         if (conn == NULL) {
2189                 CERROR ("Can't allocate conn\n");
2190                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2191                 return;
2192         }
2193
2194         conn->ibc_peer = peer;
2195         kibnal_peer_addref(peer);
2196
2197         LIBCFS_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2198         if (conn->ibc_connreq == NULL) {
2199                 CERROR ("Can't allocate connreq\n");
2200                 kibnal_connreq_done(conn, 1, -ENOMEM);
2201                 kibnal_conn_decref(conn); /* drop my ref */
2202                 return;
2203         }
2204
2205         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2206
2207         rc = kibnal_make_svcqry(conn);
2208         if (rc != 0) {
2209                 kibnal_connreq_done (conn, 1, rc);
2210                 kibnal_conn_decref(conn); /* drop my ref */
2211                 return;
2212         }
2213
2214         rc = ib_cached_gid_get(kibnal_data.kib_device,
2215                                kibnal_data.kib_port, 0,
2216                                conn->ibc_connreq->cr_gid);
2217         LASSERT (rc == 0);
2218
2219         /* kibnal_pathreq_callback gets my conn ref */
2220         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2221                                     kibnal_data.kib_port,
2222                                     conn->ibc_connreq->cr_gid,
2223                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2224                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2225                                     0,
2226                                     *kibnal_tunables.kib_timeout * HZ,
2227                                     0,
2228                                     kibnal_pathreq_callback, conn, 
2229                                     &conn->ibc_connreq->cr_tid);
2230         if (rc == 0)
2231                 return; /* callback now has my ref on conn */
2232
2233         CERROR ("Path record request %p -> %s failed: %d\n",
2234                 conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
2235         kibnal_connreq_done(conn, 1, rc);
2236         kibnal_conn_decref(conn); /* drop my ref */
2237 }
2238
2239 int
2240 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
2241 {
2242         kib_tx_t          *tx;
2243         struct list_head  *ttmp;
2244         unsigned long      flags;
2245         int                timed_out = 0;
2246
2247         spin_lock_irqsave (&conn->ibc_lock, flags);
2248
2249         list_for_each (ttmp, txs) {
2250                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2251
2252                 if (txs == &conn->ibc_active_txs) {
2253                         LASSERT (tx->tx_passive_rdma ||
2254                                  !tx->tx_passive_rdma_wait);
2255
2256                         LASSERT (tx->tx_passive_rdma_wait ||
2257                                  tx->tx_sending != 0);
2258                 } else {
2259                         LASSERT (!tx->tx_passive_rdma_wait);
2260                         LASSERT (tx->tx_sending == 0);
2261                 }
2262                 
2263                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2264                         timed_out = 1;
2265                         break;
2266                 }
2267         }
2268
2269         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2270         return timed_out;
2271 }
2272
2273 int
2274 kibnal_conn_timed_out (kib_conn_t *conn)
2275 {
2276         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
2277                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
2278                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
2279                 kibnal_check_txs(conn, &conn->ibc_active_txs);
2280 }
2281
2282 void
2283 kibnal_check_conns (int idx)
2284 {
2285         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2286         struct list_head  *ptmp;
2287         kib_peer_t        *peer;
2288         kib_conn_t        *conn;
2289         struct list_head  *ctmp;
2290         unsigned long      flags;
2291
2292  again:
2293         /* NB. We expect to have a look at all the peers and not find any
2294          * rdmas to time out, so we just use a shared lock while we
2295          * take a look... */
2296         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2297
2298         list_for_each (ptmp, peers) {
2299                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2300
2301                 list_for_each (ctmp, &peer->ibp_conns) {
2302                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2303
2304                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2305
2306
2307                         /* In case we have enough credits to return via a
2308                          * NOOP, but there were no non-blocking tx descs
2309                          * free to do it last time... */
2310                         kibnal_check_sends(conn);
2311
2312                         if (!kibnal_conn_timed_out(conn))
2313                                 continue;
2314                         
2315                         kibnal_conn_addref(conn);
2316
2317                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2318                                                flags);
2319
2320                         CERROR("Timed out RDMA with %s\n",
2321                                libcfs_nid2str(peer->ibp_nid));
2322
2323                         kibnal_close_conn (conn, -ETIMEDOUT);
2324                         kibnal_conn_decref(conn);
2325
2326                         /* start again now I've dropped the lock */
2327                         goto again;
2328                 }
2329         }
2330
2331         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2332 }
2333
2334 void
2335 kibnal_terminate_conn (kib_conn_t *conn)
2336 {
2337         int           rc;
2338
2339         CDEBUG(D_NET, "conn %p\n", conn);
2340         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2341         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2342
2343         rc = ib_cm_disconnect (conn->ibc_comm_id);
2344         if (rc != 0)
2345                 CERROR ("Error %d disconnecting conn %p -> %s\n",
2346                         rc, conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2347
2348         kibnal_peer_notify(conn->ibc_peer);
2349 }
2350
2351 int
2352 kibnal_reaper (void *arg)
2353 {
2354         wait_queue_t       wait;
2355         unsigned long      flags;
2356         kib_conn_t        *conn;
2357         int                timeout;
2358         int                i;
2359         int                peer_index = 0;
2360         unsigned long      deadline = jiffies;
2361         
2362         cfs_daemonize ("kibnal_reaper");
2363         cfs_block_allsigs ();
2364
2365         init_waitqueue_entry (&wait, current);
2366
2367         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2368
2369         while (!kibnal_data.kib_shutdown) {
2370                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2371                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2372                                            kib_conn_t, ibc_list);
2373                         list_del (&conn->ibc_list);
2374                         
2375                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2376
2377                         switch (conn->ibc_state) {
2378                         case IBNAL_CONN_DEATHROW:
2379                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2380                                 /* Disconnect: conn becomes a zombie in the
2381                                  * callback and last ref reschedules it
2382                                  * here... */
2383                                 kibnal_terminate_conn(conn);
2384                                 kibnal_conn_decref(conn);
2385                                 break;
2386
2387                         case IBNAL_CONN_INIT_QP:
2388                         case IBNAL_CONN_ZOMBIE:
2389                                 kibnal_destroy_conn (conn);
2390                                 break;
2391                                 
2392                         default:
2393                                 CERROR ("Bad conn %p state: %d\n",
2394                                         conn, conn->ibc_state);
2395                                 LBUG();
2396                         }
2397
2398                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2399                         continue;
2400                 }
2401
2402                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2403
2404                 /* careful with the jiffy wrap... */
2405                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2406                         const int n = 4;
2407                         const int p = 1;
2408                         int       chunk = kibnal_data.kib_peer_hash_size;
2409                         
2410                         /* Time to check for RDMA timeouts on a few more
2411                          * peers: I do checks every 'p' seconds on a
2412                          * proportion of the peer table and I need to check
2413                          * every connection 'n' times within a timeout
2414                          * interval, to ensure I detect a timeout on any
2415                          * connection within (n+1)/n times the timeout
2416                          * interval. */
2417
2418                         if (*kibnal_tunables.kib_timeout > n * p)
2419                                 chunk = (chunk * n * p) / 
2420                                         *kibnal_tunables.kib_timeout;
2421                         if (chunk == 0)
2422                                 chunk = 1;
2423
2424                         for (i = 0; i < chunk; i++) {
2425                                 kibnal_check_conns (peer_index);
2426                                 peer_index = (peer_index + 1) % 
2427                                              kibnal_data.kib_peer_hash_size;
2428                         }
2429
2430                         deadline += p * HZ;
2431                 }
2432
2433                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2434
2435                 set_current_state (TASK_INTERRUPTIBLE);
2436                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2437
2438                 schedule_timeout (timeout);
2439
2440                 set_current_state (TASK_RUNNING);
2441                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2442
2443                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2444         }
2445
2446         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2447
2448         kibnal_thread_fini ();
2449         return (0);
2450 }
2451
2452 int
2453 kibnal_connd (void *arg)
2454 {
2455         long               id = (long)arg;
2456         char               name[16];
2457         wait_queue_t       wait;
2458         unsigned long      flags;
2459         kib_peer_t        *peer;
2460         kib_acceptsock_t  *as;
2461         int                did_something;
2462
2463         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2464         cfs_daemonize(name);
2465         cfs_block_allsigs();
2466
2467         init_waitqueue_entry (&wait, current);
2468
2469         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2470
2471         while (!kibnal_data.kib_shutdown) {
2472                 did_something = 0;
2473
2474                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2475                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2476                                          kib_acceptsock_t, ibas_list);
2477                         list_del (&as->ibas_list);
2478                         
2479                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2480
2481                         kibnal_handle_svcqry(as->ibas_sock);
2482                         kibnal_free_acceptsock(as);
2483                         
2484                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2485                         did_something = 1;
2486                 }
2487                         
2488                 /* Only handle an outgoing connection request if there is someone left
2489                  * to handle an incoming svcqry */
2490                 if (!list_empty (&kibnal_data.kib_connd_peers) &&
2491                     ((kibnal_data.kib_connd_connecting + 1) < 
2492                      *kibnal_tunables.kib_n_connd)) {
2493                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2494                                            kib_peer_t, ibp_connd_list);
2495                         
2496                         list_del_init (&peer->ibp_connd_list);
2497                         kibnal_data.kib_connd_connecting++;
2498                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2499
2500                         kibnal_connect_peer (peer);
2501                         kibnal_peer_decref(peer);
2502
2503                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2504                         did_something = 1;
2505                         kibnal_data.kib_connd_connecting--;
2506                 }
2507
2508                 if (did_something)
2509                         continue;
2510
2511                 set_current_state (TASK_INTERRUPTIBLE);
2512                 add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
2513
2514                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2515
2516                 schedule();
2517
2518                 set_current_state (TASK_RUNNING);
2519                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2520
2521                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2522         }
2523
2524         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2525
2526         kibnal_thread_fini ();
2527         return (0);
2528 }
2529
2530 int
2531 kibnal_scheduler(void *arg)
2532 {
2533         long            id = (long)arg;
2534         char            name[16];
2535         kib_rx_t       *rx;
2536         kib_tx_t       *tx;
2537         unsigned long   flags;
2538         int             rc;
2539         int             counter = 0;
2540         int             did_something;
2541
2542         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2543         cfs_daemonize(name);
2544         cfs_block_allsigs();
2545
2546         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2547
2548         while (!kibnal_data.kib_shutdown) {
2549                 did_something = 0;
2550
2551                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2552                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2553                                         kib_tx_t, tx_list);
2554                         list_del(&tx->tx_list);
2555                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2556                                                flags);
2557                         kibnal_tx_done(tx);
2558
2559                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2560                                           flags);
2561                 }
2562
2563                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2564                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2565                                         kib_rx_t, rx_list);
2566                         list_del(&rx->rx_list);
2567                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2568                                                flags);
2569
2570                         kibnal_rx(rx);
2571
2572                         did_something = 1;
2573                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2574                                           flags);
2575                 }
2576
2577                 /* nothing to do or hogging CPU */
2578                 if (!did_something || counter++ == IBNAL_RESCHED) {
2579                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2580                                                flags);
2581                         counter = 0;
2582
2583                         if (!did_something) {
2584                                 rc = wait_event_interruptible_exclusive(
2585                                         kibnal_data.kib_sched_waitq,
2586                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2587                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2588                                         kibnal_data.kib_shutdown);
2589                         } else {
2590                                 our_cond_resched();
2591                         }
2592
2593                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2594                                           flags);
2595                 }
2596         }
2597
2598         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2599
2600         kibnal_thread_fini();
2601         return (0);
2602 }