Whamcloud - gitweb
b=10778,i=eeb:
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openiblnd.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         lnet_msg_t      *lntmsg[2];
47         unsigned long    flags;
48         int              i;
49         int              rc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         if (in_interrupt()) {
55                 /* can't deregister memory/flush FMAs/finalize in IRQ context... */
56                 kibnal_schedule_tx_done(tx);
57                 return;
58         }
59
60         switch (tx->tx_mapped) {
61         default:
62                 LBUG();
63
64         case KIB_TX_UNMAPPED:
65                 break;
66                 
67         case KIB_TX_MAPPED:
68                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
69                 LASSERT (rc == 0);
70                 tx->tx_mapped = KIB_TX_UNMAPPED;
71                 break;
72
73 #if IBNAL_FMR
74         case KIB_TX_MAPPED_FMR:
75                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
76                 LASSERT (rc == 0);
77
78 #ifndef USING_TSAPI
79                 /* Somewhat belt-and-braces since the tx's conn has closed if
80                  * this was a passive RDMA waiting to complete... */
81                 if (tx->tx_status != 0)
82                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
83 #endif
84                 tx->tx_mapped = KIB_TX_UNMAPPED;
85                 break;
86 #endif
87         }
88
89         /* tx may have up to 2 ptlmsgs to finalise */
90         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
91         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
92         rc = tx->tx_status;
93
94         if (tx->tx_conn != NULL) {
95                 kibnal_conn_decref(tx->tx_conn);
96                 tx->tx_conn = NULL;
97         }
98
99         tx->tx_nsp = 0;
100         tx->tx_passive_rdma = 0;
101         tx->tx_status = 0;
102
103         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
104
105         list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
106
107         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
108
109         /* delay finalize until my descs have been freed */
110         for (i = 0; i < 2; i++) {
111                 if (lntmsg[i] == NULL)
112                         continue;
113
114                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
115         }
116 }
117
118 kib_tx_t *
119 kibnal_get_idle_tx (void) 
120 {
121         unsigned long  flags;
122         kib_tx_t      *tx;
123         
124         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
125
126         if (list_empty (&kibnal_data.kib_idle_txs)) {
127                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
128                 return NULL;
129         }
130
131         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
132         list_del (&tx->tx_list);
133
134         /* Allocate a new passive RDMA completion cookie.  It might not be
135          * needed, but we've got a lock right now and we're unlikely to
136          * wrap... */
137         tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
138
139         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
140
141         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
142         LASSERT (tx->tx_nsp == 0);
143         LASSERT (tx->tx_sending == 0);
144         LASSERT (tx->tx_status == 0);
145         LASSERT (tx->tx_conn == NULL);
146         LASSERT (!tx->tx_passive_rdma);
147         LASSERT (!tx->tx_passive_rdma_wait);
148         LASSERT (tx->tx_lntmsg[0] == NULL);
149         LASSERT (tx->tx_lntmsg[1] == NULL);
150
151         return tx;
152 }
153
154 void
155 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
156 {
157         struct list_head *ttmp;
158         unsigned long     flags;
159         int               idle;
160
161         spin_lock_irqsave (&conn->ibc_lock, flags);
162
163         list_for_each (ttmp, &conn->ibc_active_txs) {
164                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
165
166                 LASSERT (tx->tx_passive_rdma ||
167                          !tx->tx_passive_rdma_wait);
168
169                 LASSERT (tx->tx_passive_rdma_wait ||
170                          tx->tx_sending != 0);
171
172                 if (!tx->tx_passive_rdma_wait ||
173                     tx->tx_passive_rdma_cookie != cookie)
174                         continue;
175
176                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
177
178                 /* XXX Set mlength of reply here */
179
180                 tx->tx_status = status;
181                 tx->tx_passive_rdma_wait = 0;
182                 idle = (tx->tx_sending == 0);
183
184                 if (idle)
185                         list_del (&tx->tx_list);
186
187                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
188
189                 /* I could be racing with tx callbacks.  It's whoever
190                  * _makes_ tx idle that frees it */
191                 if (idle)
192                         kibnal_tx_done (tx);
193                 return;
194         }
195                 
196         spin_unlock_irqrestore (&conn->ibc_lock, flags);
197
198         CERROR ("Unmatched (late?) RDMA completion "LPX64" from %s\n",
199                 cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
200 }
201
202 void
203 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
204 {
205         kib_conn_t   *conn = rx->rx_conn;
206         int           rc;
207         unsigned long flags;
208
209         LASSERT(!rsrvd_credit ||
210                 conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
211
212         rx->rx_gl = (struct ib_gather_scatter) {
213                 .address = rx->rx_vaddr,
214                 .length  = IBNAL_MSG_SIZE,
215                 .key     = conn->ibc_rx_pages->ibp_lkey,
216         };
217
218         rx->rx_sp = (struct ib_receive_param) {
219                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
220                 .scatter_list           = &rx->rx_gl,
221                 .num_scatter_entries    = 1,
222                 .device_specific        = NULL,
223                 .signaled               = 1,
224         };
225
226         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
227         LASSERT (rx->rx_nob >= 0);              /* not posted */
228         rx->rx_nob = -1;                        /* is now */
229         mb();
230
231         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
232                 rc = -ECONNABORTED;
233         else
234                 rc = kibnal_ib_receive(conn->ibc_qp, &rx->rx_sp);
235
236         if (rc == 0) {
237                 if (credit || rsrvd_credit) {
238                         spin_lock_irqsave(&conn->ibc_lock, flags);
239
240                         if (credit)
241                                 conn->ibc_outstanding_credits++;
242                         if (rsrvd_credit)
243                                 conn->ibc_reserved_credits++;
244                         
245                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
246
247                         kibnal_check_sends(conn);
248                 }
249                 return;
250         }
251
252         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
253                 CERROR ("Error posting receive -> %s: %d\n",
254                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
255                 kibnal_close_conn (rx->rx_conn, rc);
256         } else {
257                 CDEBUG (D_NET, "Error posting receive -> %s: %d\n",
258                         libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
259         }
260
261         /* Drop rx's ref */
262         kibnal_conn_decref(conn);
263 }
264
265 void
266 kibnal_rx_callback (struct ib_cq_entry *e)
267 {
268         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
269         kib_msg_t    *msg = rx->rx_msg;
270         kib_conn_t   *conn = rx->rx_conn;
271         int           credits;
272         unsigned long flags;
273         int           rc;
274         int           err = -ECONNABORTED;
275
276         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
277         LASSERT (rx->rx_nob < 0);               /* was posted */
278         rx->rx_nob = 0;                         /* isn't now */
279         mb();
280
281         /* receives complete with error in any case after we've started
282          * closing the QP */
283         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
284                 goto failed;
285
286         /* We don't post receives until the conn is established */
287         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
288
289         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
290                 CERROR("Rx from %s failed: %d\n", 
291                        libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
292                 goto failed;
293         }
294
295         LASSERT (e->bytes_transferred >= 0);
296         rx->rx_nob = e->bytes_transferred;
297         mb();
298
299         rc = kibnal_unpack_msg(msg, conn->ibc_version, rx->rx_nob);
300         if (rc != 0) {
301                 CERROR ("Error %d unpacking rx from %s\n",
302                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
303                 goto failed;
304         }
305
306         if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
307                                      msg->ibm_srcnid) ||
308             !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
309                                      msg->ibm_dstnid) ||
310             msg->ibm_srcstamp != conn->ibc_incarnation ||
311             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
312                 CERROR ("Stale rx from %s\n",
313                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
314                 err = -ESTALE;
315                 goto failed;
316         }
317
318         /* Have I received credits that will let me send? */
319         credits = msg->ibm_credits;
320         if (credits != 0) {
321                 spin_lock_irqsave(&conn->ibc_lock, flags);
322                 conn->ibc_credits += credits;
323                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
324                 
325                 kibnal_check_sends(conn);
326         }
327
328         switch (msg->ibm_type) {
329         case IBNAL_MSG_NOOP:
330                 kibnal_post_rx (rx, 1, 0);
331                 return;
332
333         case IBNAL_MSG_IMMEDIATE:
334                 break;
335                 
336         case IBNAL_MSG_PUT_RDMA:
337         case IBNAL_MSG_GET_RDMA:
338                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
339                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
340                        msg->ibm_u.rdma.ibrm_desc.rd_key,
341                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
342                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
343                 break;
344                 
345         case IBNAL_MSG_PUT_DONE:
346         case IBNAL_MSG_GET_DONE:
347                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
348                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
349                        msg->ibm_u.completion.ibcm_status);
350
351                 kibnal_complete_passive_rdma (conn, 
352                                               msg->ibm_u.completion.ibcm_cookie,
353                                               msg->ibm_u.completion.ibcm_status);
354
355                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
356                         kibnal_post_rx (rx, 1, 0);
357                 } else {
358                         /* this reply buffer was pre-reserved */
359                         kibnal_post_rx (rx, 0, 1);
360                 }
361                 return;
362                         
363         default:
364                 CERROR ("Bad msg type %x from %s\n",
365                         msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
366                 goto failed;
367         }
368
369         kibnal_peer_alive(conn->ibc_peer);
370
371         /* schedule for kibnal_rx() in thread context */
372         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
373         
374         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
375         wake_up (&kibnal_data.kib_sched_waitq);
376         
377         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
378         return;
379         
380  failed:
381         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
382         kibnal_close_conn(conn, err);
383
384         /* Don't re-post rx & drop its ref on conn */
385         kibnal_conn_decref(conn);
386 }
387
388 void
389 kibnal_rx (kib_rx_t *rx)
390 {
391         int          rc = 0;
392         kib_msg_t   *msg = rx->rx_msg;
393
394         switch (msg->ibm_type) {
395         case IBNAL_MSG_GET_RDMA:
396                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
397                                 msg->ibm_srcnid, rx, 1);
398                 break;
399                 
400         case IBNAL_MSG_PUT_RDMA:
401                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
402                                 msg->ibm_srcnid, rx, 1);
403                 break;
404
405         case IBNAL_MSG_IMMEDIATE:
406                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
407                                 msg->ibm_srcnid, rx, 0);
408                 break;
409
410         default:
411                 LBUG();
412                 break;
413         }
414
415         if (rc < 0) {
416                 kibnal_close_conn(rx->rx_conn, rc);
417                 kibnal_post_rx (rx, 1, 0);
418         }
419 }
420
421 #if 0
422 int
423 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
424 {
425         struct page *page;
426
427         if (vaddr >= VMALLOC_START &&
428             vaddr < VMALLOC_END)
429                 page = vmalloc_to_page ((void *)vaddr);
430 #if CONFIG_HIGHMEM
431         else if (vaddr >= PKMAP_BASE &&
432                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
433                 page = vmalloc_to_page ((void *)vaddr);
434         /* in 2.4 ^ just walks the page tables */
435 #endif
436         else
437                 page = virt_to_page (vaddr);
438
439         if (page == NULL ||
440             !VALID_PAGE (page))
441                 return (-EFAULT);
442
443         *physp = lnet_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
444         return (0);
445 }
446 #endif
447
448 int
449 kibnal_map_iov (kib_tx_t *tx, int access,
450                 unsigned int niov, struct iovec *iov, int offset, int nob)
451                  
452 {
453         void   *vaddr;
454         int     rc;
455
456         LASSERT (nob > 0);
457         LASSERT (niov > 0);
458         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
459
460         while (offset >= iov->iov_len) {
461                 offset -= iov->iov_len;
462                 niov--;
463                 iov++;
464                 LASSERT (niov > 0);
465         }
466
467         if (nob > iov->iov_len - offset) {
468                 CERROR ("Can't map multiple vaddr fragments\n");
469                 return (-EMSGSIZE);
470         }
471
472         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
473         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
474
475         rc = ib_memory_register (kibnal_data.kib_pd,
476                                  vaddr, nob,
477                                  access,
478                                  &tx->tx_md.md_handle.mr,
479                                  &tx->tx_md.md_lkey,
480                                  &tx->tx_md.md_rkey);
481         
482         if (rc != 0) {
483                 CERROR ("Can't map vaddr: %d\n", rc);
484                 return (rc);
485         }
486
487         tx->tx_mapped = KIB_TX_MAPPED;
488         return (0);
489 }
490
491 int
492 kibnal_map_kiov (kib_tx_t *tx, int access,
493                   int nkiov, lnet_kiov_t *kiov,
494                   int offset, int nob)
495 {
496 #if IBNAL_FMR
497         __u64                      *phys;
498         const int                   mapped = KIB_TX_MAPPED_FMR;
499 #else
500         struct ib_physical_buffer  *phys;
501         const int                   mapped = KIB_TX_MAPPED;
502 #endif
503         int                         page_offset;
504         int                         nphys;
505         int                         resid;
506         int                         phys_size;
507         int                         rc;
508
509         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
510
511         LASSERT (nob > 0);
512         LASSERT (nkiov > 0);
513         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
514
515         while (offset >= kiov->kiov_len) {
516                 offset -= kiov->kiov_len;
517                 nkiov--;
518                 kiov++;
519                 LASSERT (nkiov > 0);
520         }
521
522         phys_size = nkiov * sizeof (*phys);
523         LIBCFS_ALLOC(phys, phys_size);
524         if (phys == NULL) {
525                 CERROR ("Can't allocate tmp phys\n");
526                 return (-ENOMEM);
527         }
528
529         page_offset = kiov->kiov_offset + offset;
530 #if IBNAL_FMR
531         phys[0] = lnet_page2phys(kiov->kiov_page);
532 #else
533         phys[0].address = lnet_page2phys(kiov->kiov_page);
534         phys[0].size = PAGE_SIZE;
535 #endif
536         nphys = 1;
537         resid = nob - (kiov->kiov_len - offset);
538
539         while (resid > 0) {
540                 kiov++;
541                 nkiov--;
542                 LASSERT (nkiov > 0);
543
544                 if (kiov->kiov_offset != 0 ||
545                     ((resid > PAGE_SIZE) && 
546                      kiov->kiov_len < PAGE_SIZE)) {
547                         int i;
548                         /* Can't have gaps */
549                         CERROR ("Can't make payload contiguous in I/O VM:"
550                                 "page %d, offset %d, len %d \n", nphys, 
551                                 kiov->kiov_offset, kiov->kiov_len);
552
553                         for (i = -nphys; i < nkiov; i++) 
554                         {
555                                 CERROR("kiov[%d] %p +%d for %d\n",
556                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
557                         }
558                         
559                         rc = -EINVAL;
560                         goto out;
561                 }
562
563                 if (nphys == LNET_MAX_IOV) {
564                         CERROR ("payload too big (%d)\n", nphys);
565                         rc = -EMSGSIZE;
566                         goto out;
567                 }
568
569                 LASSERT (nphys * sizeof (*phys) < phys_size);
570 #if IBNAL_FMR
571                 phys[nphys] = lnet_page2phys(kiov->kiov_page);
572 #else
573                 phys[nphys].address = lnet_page2phys(kiov->kiov_page);
574                 phys[nphys].size = PAGE_SIZE;
575 #endif
576                 nphys++;
577
578                 resid -= PAGE_SIZE;
579         }
580
581         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
582
583 #if IBNAL_FMR
584         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
585                                        phys, nphys,
586                                        &tx->tx_md.md_addr,
587                                        page_offset,
588                                        &tx->tx_md.md_handle.fmr,
589                                        &tx->tx_md.md_lkey,
590                                        &tx->tx_md.md_rkey);
591 #else
592         rc = ib_memory_register_physical (kibnal_data.kib_pd,
593                                           phys, nphys,
594                                           &tx->tx_md.md_addr,
595                                           nob, page_offset,
596                                           access,
597                                           &tx->tx_md.md_handle.mr,
598                                           &tx->tx_md.md_lkey,
599                                           &tx->tx_md.md_rkey);
600 #endif
601         if (rc == 0) {
602                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
603                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
604                 tx->tx_mapped = mapped;
605         } else {
606                 CERROR ("Can't map phys: %d\n", rc);
607                 rc = -EFAULT;
608         }
609
610  out:
611         LIBCFS_FREE(phys, phys_size);
612         return (rc);
613 }
614
615 kib_conn_t *
616 kibnal_find_conn_locked (kib_peer_t *peer)
617 {
618         struct list_head *tmp;
619
620         /* just return the first connection */
621         list_for_each (tmp, &peer->ibp_conns) {
622                 return (list_entry(tmp, kib_conn_t, ibc_list));
623         }
624
625         return (NULL);
626 }
627
628 void
629 kibnal_check_sends (kib_conn_t *conn)
630 {
631         unsigned long   flags;
632         kib_tx_t       *tx;
633         int             rc;
634         int             i;
635         int             consume_credit;
636         int             done;
637         int             nwork;
638
639         spin_lock_irqsave (&conn->ibc_lock, flags);
640
641         LASSERT (conn->ibc_nsends_posted <= IBNAL_RX_MSGS);
642         LASSERT (conn->ibc_reserved_credits >= 0);
643
644         while (conn->ibc_reserved_credits > 0 &&
645                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
646                 LASSERT (conn->ibc_version !=
647                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
648                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
649                                 kib_tx_t, tx_list);
650                 list_del(&tx->tx_list);
651                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
652                 conn->ibc_reserved_credits--;
653         }
654
655         if (list_empty(&conn->ibc_tx_queue) &&
656             list_empty(&conn->ibc_tx_queue_nocred) &&
657             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
658              kibnal_send_keepalive(conn))) {
659                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
660                 
661                 tx = kibnal_get_idle_tx();
662                 if (tx != NULL)
663                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
664
665                 spin_lock_irqsave(&conn->ibc_lock, flags);
666                 
667                 if (tx != NULL)
668                         kibnal_queue_tx_locked(tx, conn);
669         }
670
671         for (;;) {
672                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
673                         LASSERT (conn->ibc_version !=
674                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
675                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
676                                         kib_tx_t, tx_list);
677                         consume_credit = 0;
678                 } else if (!list_empty (&conn->ibc_tx_queue)) {
679                         tx = list_entry (conn->ibc_tx_queue.next, 
680                                          kib_tx_t, tx_list);
681                         consume_credit = 1;
682                 } else {
683                         /* nothing waiting */
684                         break;
685                 }
686
687                 /* We rely on this for QP sizing */
688                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
689
690                 LASSERT (conn->ibc_outstanding_credits >= 0);
691                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
692                 LASSERT (conn->ibc_credits >= 0);
693                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
694
695                 /* Not on ibc_rdma_queue */
696                 LASSERT (!tx->tx_passive_rdma_wait);
697
698                 if (conn->ibc_nsends_posted == IBNAL_RX_MSGS)
699                         break;
700
701                 if (consume_credit) {
702                         if (conn->ibc_credits == 0)     /* no credits */
703                                 break;
704                 
705                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
706                             conn->ibc_outstanding_credits == 0) /* giving back credits */
707                                 break;
708                 }
709                 
710                 list_del (&tx->tx_list);
711
712                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
713                     (!list_empty(&conn->ibc_tx_queue) ||
714                      !list_empty(&conn->ibc_tx_queue_nocred) ||
715                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
716                       !kibnal_send_keepalive(conn)))) {
717                         /* redundant NOOP */
718                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
719                         kibnal_tx_done(tx);
720                         spin_lock_irqsave(&conn->ibc_lock, flags);
721                         continue;
722                 }
723
724                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
725                                 conn->ibc_outstanding_credits,
726                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
727
728                 conn->ibc_outstanding_credits = 0;
729                 conn->ibc_nsends_posted++;
730                 if (consume_credit)
731                         conn->ibc_credits--;
732
733                 tx->tx_sending = tx->tx_nsp;
734                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
735                 list_add (&tx->tx_list, &conn->ibc_active_txs);
736
737                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
738
739                 /* NB the gap between removing tx from the queue and sending it
740                  * allows message re-ordering to occur */
741
742                 LASSERT (tx->tx_nsp > 0);
743
744                 rc = -ECONNABORTED;
745                 nwork = 0;
746                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
747                         tx->tx_status = 0;
748                         /* Driver only accepts 1 item at a time */
749                         for (i = 0; i < tx->tx_nsp; i++) {
750                                 rc = kibnal_ib_send(conn->ibc_qp, &tx->tx_sp[i]);
751                                 if (rc != 0)
752                                         break;
753                                 nwork++;
754                         }
755                 }
756
757                 conn->ibc_last_send = jiffies;
758
759                 spin_lock_irqsave (&conn->ibc_lock, flags);
760                 if (rc != 0) {
761                         /* NB credits are transferred in the actual
762                          * message, which can only be the last work item */
763                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
764                         if (consume_credit)
765                                 conn->ibc_credits++;
766                         conn->ibc_nsends_posted--;
767
768                         tx->tx_status = rc;
769                         tx->tx_passive_rdma_wait = 0;
770                         tx->tx_sending -= tx->tx_nsp - nwork;
771
772                         done = (tx->tx_sending == 0);
773                         if (done)
774                                 list_del (&tx->tx_list);
775                         
776                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
777                         
778                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
779                                 CERROR ("Error %d posting transmit to %s\n", 
780                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
781                         else
782                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
783                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
784
785                         kibnal_close_conn (conn, rc);
786
787                         if (done)
788                                 kibnal_tx_done (tx);
789                         return;
790                 }
791                 
792         }
793
794         spin_unlock_irqrestore (&conn->ibc_lock, flags);
795 }
796
797 void
798 kibnal_tx_callback (struct ib_cq_entry *e)
799 {
800         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
801         kib_conn_t   *conn;
802         unsigned long flags;
803         int           idle;
804
805         conn = tx->tx_conn;
806         LASSERT (conn != NULL);
807         LASSERT (tx->tx_sending != 0);
808
809         spin_lock_irqsave(&conn->ibc_lock, flags);
810
811         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
812                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
813                e->status);
814
815         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
816          * gets to free it, which also drops its ref on 'conn'.  If it's
817          * not me, then I take an extra ref on conn so it can't disappear
818          * under me. */
819
820         tx->tx_sending--;
821         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
822                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
823         if (idle)
824                 list_del(&tx->tx_list);
825
826         kibnal_conn_addref(conn);
827
828         if (tx->tx_sending == 0)
829                 conn->ibc_nsends_posted--;
830
831         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
832             tx->tx_status == 0)
833                 tx->tx_status = -ECONNABORTED;
834                 
835         spin_unlock_irqrestore(&conn->ibc_lock, flags);
836
837         if (idle)
838                 kibnal_tx_done (tx);
839
840         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
841                 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", 
842                         libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
843                 kibnal_close_conn (conn, -ENETDOWN);
844         } else {
845                 kibnal_peer_alive(conn->ibc_peer);
846                 /* can I shovel some more sends out the door? */
847                 kibnal_check_sends(conn);
848         }
849
850         kibnal_conn_decref(conn);
851 }
852
853 void
854 kibnal_callback (ib_cq_t *cq, struct ib_cq_entry *e, void *arg)
855 {
856         if (kibnal_wreqid_is_rx(e->work_request_id))
857                 kibnal_rx_callback (e);
858         else
859                 kibnal_tx_callback (e);
860 }
861
862 void
863 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
864 {
865         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
866         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
867         int                       fence;
868         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
869
870         LASSERT (tx->tx_nsp >= 0 && 
871                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
872         LASSERT (nob <= IBNAL_MSG_SIZE);
873
874         kibnal_init_msg(tx->tx_msg, type, body_nob);
875
876         /* Fence the message if it's bundled with an RDMA read */
877         fence = (tx->tx_nsp > 0) &&
878                 (type == IBNAL_MSG_PUT_DONE);
879
880         *gl = (struct ib_gather_scatter) {
881                 .address = tx->tx_vaddr,
882                 .length  = nob,
883                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
884         };
885
886         /* NB If this is an RDMA read, the completion message must wait for
887          * the RDMA to complete.  Sends wait for previous RDMA writes
888          * anyway... */
889         *sp = (struct ib_send_param) {
890                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
891                 .op                   = IB_OP_SEND,
892                 .gather_list          = gl,
893                 .num_gather_entries   = 1,
894                 .device_specific      = NULL,
895                 .solicited_event      = 1,
896                 .signaled             = 1,
897                 .immediate_data_valid = 0,
898                 .fence                = fence,
899                 .inline_data          = 0,
900         };
901
902         tx->tx_nsp++;
903 }
904
905 void
906 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
907 {
908         unsigned long         flags;
909
910         spin_lock_irqsave(&conn->ibc_lock, flags);
911
912         kibnal_queue_tx_locked (tx, conn);
913         
914         spin_unlock_irqrestore(&conn->ibc_lock, flags);
915         
916         kibnal_check_sends(conn);
917 }
918
919 void
920 kibnal_schedule_active_connect_locked (kib_peer_t *peer)
921 {
922         /* Called with exclusive kib_global_lock */
923
924         peer->ibp_connecting++;
925         kibnal_peer_addref(peer); /* extra ref for connd */
926         
927         spin_lock (&kibnal_data.kib_connd_lock);
928         
929         LASSERT (list_empty(&peer->ibp_connd_list));
930         list_add_tail (&peer->ibp_connd_list,
931                        &kibnal_data.kib_connd_peers);
932         wake_up (&kibnal_data.kib_connd_waitq);
933         
934         spin_unlock (&kibnal_data.kib_connd_lock);
935 }
936
937 void
938 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
939 {
940         unsigned long    flags;
941         kib_peer_t      *peer;
942         kib_conn_t      *conn;
943         int              retry;
944         int              rc;
945         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
946
947         /* If I get here, I've committed to send, so I complete the tx with
948          * failure on any problems */
949         
950         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
951         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
952
953         for (retry = 0; ; retry = 1) {
954                 read_lock_irqsave(g_lock, flags);
955         
956                 peer = kibnal_find_peer_locked (nid);
957                 if (peer != NULL) {
958                         conn = kibnal_find_conn_locked (peer);
959                         if (conn != NULL) {
960                                 kibnal_conn_addref(conn); /* 1 ref for me...*/
961                                 read_unlock_irqrestore(g_lock, flags);
962                 
963                                 kibnal_queue_tx (tx, conn);
964                                 kibnal_conn_decref(conn); /* ...until here */
965                                 return;
966                         }
967                 }
968                 
969                 /* Making one or more connections; I'll need a write lock... */
970                 read_unlock(g_lock);
971                 write_lock(g_lock);
972
973                 peer = kibnal_find_peer_locked (nid);
974                 if (peer != NULL)
975                         break;
976                 
977                 write_unlock_irqrestore (g_lock, flags);
978
979                 if (retry) {
980                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
981                         tx->tx_status = -EHOSTUNREACH;
982                         kibnal_tx_done (tx);
983                         return;
984                 }
985
986                 rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid),
987                                                 lnet_acceptor_port());
988                 if (rc != 0) {
989                         CERROR("Can't add peer %s: %d\n",
990                                libcfs_nid2str(nid), rc);
991                         tx->tx_status = rc;
992                         kibnal_tx_done(tx);
993                         return;
994                 }
995         }
996
997         conn = kibnal_find_conn_locked (peer);
998         if (conn != NULL) {
999                 /* Connection exists; queue message on it */
1000                 kibnal_conn_addref(conn);       /* +1 ref from me... */
1001                 write_unlock_irqrestore (g_lock, flags);
1002                 
1003                 kibnal_queue_tx (tx, conn);
1004                 kibnal_conn_decref(conn);       /* ...until here */
1005                 return;
1006         }
1007
1008         if (peer->ibp_connecting == 0 &&
1009             peer->ibp_accepting == 0) {
1010                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1011                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1012                         write_unlock_irqrestore (g_lock, flags);
1013                         tx->tx_status = -EHOSTUNREACH;
1014                         kibnal_tx_done (tx);
1015                         return;
1016                 }
1017         
1018                 kibnal_schedule_active_connect_locked(peer);
1019         }
1020         
1021         /* A connection is being established; queue the message... */
1022         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1023
1024         write_unlock_irqrestore (g_lock, flags);
1025 }
1026
1027 void
1028 kibnal_txlist_done (struct list_head *txlist, int status)
1029 {
1030         kib_tx_t *tx;
1031
1032         while (!list_empty(txlist)) {
1033                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1034
1035                 list_del (&tx->tx_list);
1036                 /* complete now */
1037                 tx->tx_status = status;
1038                 kibnal_tx_done (tx);
1039         }
1040 }
1041
1042 int
1043 kibnal_start_passive_rdma (int type, lnet_msg_t *lntmsg,
1044                            int niov, struct iovec *iov, lnet_kiov_t *kiov,
1045                            int nob)
1046 {
1047         lnet_nid_t  nid = lntmsg->msg_target.nid;
1048         kib_tx_t   *tx;
1049         kib_msg_t  *ibmsg;
1050         int         rc;
1051         int         access;
1052         
1053         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1054                  type == IBNAL_MSG_GET_RDMA);
1055         LASSERT (nob > 0);
1056         LASSERT (!in_interrupt());              /* Mapping could block */
1057
1058         if (type == IBNAL_MSG_PUT_RDMA) {
1059                 access = IB_ACCESS_REMOTE_READ;
1060         } else {
1061                 access = IB_ACCESS_REMOTE_WRITE |
1062                          IB_ACCESS_LOCAL_WRITE;
1063         }
1064
1065         tx = kibnal_get_idle_tx ();
1066         if (tx == NULL) {
1067                 CERROR("Can't allocate %s txd for %s\n",
1068                        (type == IBNAL_MSG_PUT_RDMA) ? "PUT/REPLY" : "GET",
1069                        libcfs_nid2str(nid));
1070                 return -ENOMEM;
1071         }
1072
1073         
1074         if (iov != NULL) 
1075                 rc = kibnal_map_iov (tx, access, niov, iov, 0, nob);
1076         else
1077                 rc = kibnal_map_kiov (tx, access, niov, kiov, 0, nob);
1078
1079         if (rc != 0) {
1080                 CERROR ("Can't map RDMA for %s: %d\n", 
1081                         libcfs_nid2str(nid), rc);
1082                 goto failed;
1083         }
1084         
1085         if (type == IBNAL_MSG_GET_RDMA) {
1086                 /* reply gets finalized when tx completes */
1087                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni, 
1088                                                          lntmsg);
1089                 if (tx->tx_lntmsg[1] == NULL) {
1090                         CERROR ("Can't create reply for GET -> %s\n",
1091                                 libcfs_nid2str(nid));
1092                         rc = -ENOMEM;
1093                         goto failed;
1094                 }
1095         }
1096         
1097         tx->tx_passive_rdma = 1;
1098
1099         ibmsg = tx->tx_msg;
1100
1101         ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr;
1102         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1103         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1104         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1105         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1106
1107         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1108
1109         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1110                LPX64", nob %d\n",
1111                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1112                tx->tx_md.md_addr, nob);
1113         
1114         /* lntmsg gets finalized when tx completes. */
1115         tx->tx_lntmsg[0] = lntmsg;
1116
1117         kibnal_launch_tx(tx, nid);
1118         return (0);
1119
1120  failed:
1121         tx->tx_status = rc;
1122         kibnal_tx_done (tx);
1123         return (-EIO);
1124 }
1125
1126 void
1127 kibnal_start_active_rdma (int type, int status,
1128                           kib_rx_t *rx, lnet_msg_t *lntmsg, 
1129                           unsigned int niov,
1130                           struct iovec *iov, lnet_kiov_t *kiov,
1131                           int offset, int nob)
1132 {
1133         kib_msg_t    *rxmsg = rx->rx_msg;
1134         kib_msg_t    *txmsg;
1135         kib_tx_t     *tx;
1136         int           access;
1137         int           rdma_op;
1138         int           rc;
1139
1140         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1141                type, status, niov, offset, nob);
1142
1143         /* Called by scheduler */
1144         LASSERT (!in_interrupt ());
1145
1146         /* Either all pages or all vaddrs */
1147         LASSERT (!(kiov != NULL && iov != NULL));
1148
1149         /* No data if we're completing with failure */
1150         LASSERT (status == 0 || nob == 0);
1151
1152         LASSERT (type == IBNAL_MSG_GET_DONE ||
1153                  type == IBNAL_MSG_PUT_DONE);
1154
1155         if (type == IBNAL_MSG_GET_DONE) {
1156                 access   = 0;
1157                 rdma_op  = IB_OP_RDMA_WRITE;
1158                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1159         } else {
1160                 access   = IB_ACCESS_LOCAL_WRITE;
1161                 rdma_op  = IB_OP_RDMA_READ;
1162                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1163         }
1164
1165         tx = kibnal_get_idle_tx ();
1166         if (tx == NULL) {
1167                 CERROR ("tx descs exhausted on RDMA from %s"
1168                         " completing locally with failure\n",
1169                         libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid));
1170                 lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM);
1171                 return;
1172         }
1173         LASSERT (tx->tx_nsp == 0);
1174                         
1175         if (nob != 0) {
1176                 /* We actually need to transfer some data (the transfer
1177                  * size could get truncated to zero when the incoming
1178                  * message is matched) */
1179
1180                 if (kiov != NULL)
1181                         rc = kibnal_map_kiov (tx, access,
1182                                               niov, kiov, offset, nob);
1183                 else
1184                         rc = kibnal_map_iov (tx, access,
1185                                              niov, iov, offset, nob);
1186                 
1187                 if (rc != 0) {
1188                         CERROR ("Can't map RDMA -> %s: %d\n", 
1189                                 libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid), 
1190                                 rc);
1191                         /* We'll skip the RDMA and complete with failure. */
1192                         status = rc;
1193                         nob = 0;
1194                 } else {
1195                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1196                                 .address = tx->tx_md.md_addr,
1197                                 .length  = nob,
1198                                 .key     = tx->tx_md.md_lkey,
1199                         };
1200                 
1201                         tx->tx_sp[0] = (struct ib_send_param) {
1202                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1203                                 .op                   = rdma_op,
1204                                 .gather_list          = &tx->tx_gl[0],
1205                                 .num_gather_entries   = 1,
1206                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1207                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1208                                 .device_specific      = NULL,
1209                                 .solicited_event      = 0,
1210                                 .signaled             = 1,
1211                                 .immediate_data_valid = 0,
1212                                 .fence                = 0,
1213                                 .inline_data          = 0,
1214                         };
1215
1216                         tx->tx_nsp = 1;
1217                 }
1218         }
1219
1220         txmsg = tx->tx_msg;
1221
1222         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1223         txmsg->ibm_u.completion.ibcm_status = status;
1224         
1225         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1226
1227         if (status == 0 && nob != 0) {
1228                 LASSERT (tx->tx_nsp > 1);
1229                 /* RDMA: lntmsg gets finalized when the tx completes.  This
1230                  * is after the completion message has been sent, which in
1231                  * turn is after the RDMA has finished. */
1232                 tx->tx_lntmsg[0] = lntmsg;
1233         } else {
1234                 LASSERT (tx->tx_nsp == 1);
1235                 /* No RDMA: local completion happens now! */
1236                 CDEBUG(D_NET, "No data: immediate completion\n");
1237                 lnet_finalize (kibnal_data.kib_ni, lntmsg,
1238                               status == 0 ? 0 : -EIO);
1239         }
1240
1241         kibnal_queue_tx(tx, rx->rx_conn);
1242 }
1243
1244 int
1245 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1246 {
1247         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1248         int               type = lntmsg->msg_type; 
1249         lnet_process_id_t target = lntmsg->msg_target;
1250         int               target_is_router = lntmsg->msg_target_is_router;
1251         int               routing = lntmsg->msg_routing;
1252         unsigned int      payload_niov = lntmsg->msg_niov; 
1253         struct iovec     *payload_iov = lntmsg->msg_iov; 
1254         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1255         unsigned int      payload_offset = lntmsg->msg_offset;
1256         unsigned int      payload_nob = lntmsg->msg_len;
1257         kib_msg_t        *ibmsg;
1258         kib_tx_t         *tx;
1259         int               nob;
1260
1261         /* NB 'private' is different depending on what we're sending.... */
1262
1263         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1264                payload_nob, payload_niov, libcfs_id2str(target));
1265
1266         LASSERT (payload_nob == 0 || payload_niov > 0);
1267         LASSERT (payload_niov <= LNET_MAX_IOV);
1268
1269         /* Thread context if we're sending payload */
1270         LASSERT (!in_interrupt() || payload_niov == 0);
1271         /* payload is either all vaddrs or all pages */
1272         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1273
1274         switch (type) {
1275         default:
1276                 LBUG();
1277                 return (-EIO);
1278                 
1279         case LNET_MSG_ACK:
1280                 LASSERT (payload_nob == 0);
1281                 break;
1282
1283         case LNET_MSG_GET:
1284                 if (routing || target_is_router)
1285                         break;                  /* send IMMEDIATE */
1286
1287                 /* is the REPLY message too small for RDMA? */
1288                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1289                 if (nob <= IBNAL_MSG_SIZE)
1290                         break;                  /* send IMMEDIATE */
1291
1292                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1293                         return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1294                                                          lntmsg->msg_md->md_niov, 
1295                                                          lntmsg->msg_md->md_iov.iov, NULL,
1296                                                          lntmsg->msg_md->md_length);
1297
1298                 return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, 
1299                                                  lntmsg->msg_md->md_niov, 
1300                                                  NULL, lntmsg->msg_md->md_iov.kiov,
1301                                                  lntmsg->msg_md->md_length);
1302
1303         case LNET_MSG_REPLY:
1304         case LNET_MSG_PUT:
1305                 /* Is the payload small enough not to need RDMA? */
1306                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1307                 if (nob <= IBNAL_MSG_SIZE)
1308                         break;                  /* send IMMEDIATE */
1309                 
1310                 return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg,
1311                                                  payload_niov,
1312                                                  payload_iov, payload_kiov,
1313                                                  payload_nob);
1314         }
1315
1316         /* Send IMMEDIATE */
1317
1318         tx = kibnal_get_idle_tx();
1319         if (tx == NULL) {
1320                 CERROR ("Can't send %d to %s: tx descs exhausted%s\n", 
1321                         type, libcfs_nid2str(target.nid), 
1322                         in_interrupt() ? " (intr)" : "");
1323                 return (-ENOMEM);
1324         }
1325
1326         ibmsg = tx->tx_msg;
1327         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1328
1329         if (payload_kiov != NULL)
1330                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1331                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1332                                     payload_niov, payload_kiov, 
1333                                     payload_offset, payload_nob);
1334         else
1335                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1336                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1337                                    payload_niov, payload_iov, 
1338                                    payload_offset, payload_nob);
1339
1340         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1341                             offsetof(kib_immediate_msg_t, 
1342                                      ibim_payload[payload_nob]));
1343
1344         /* lntmsg gets finalized when tx completes */
1345         tx->tx_lntmsg[0] = lntmsg;
1346
1347         kibnal_launch_tx(tx, target.nid);
1348         return (0);
1349 }
1350
1351 int
1352 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1353                    void **new_private)
1354 {
1355         kib_rx_t    *rx = private;
1356         kib_conn_t  *conn = rx->rx_conn;
1357
1358         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1359                 /* Can't block if RDMA completions need normal credits */
1360                 LCONSOLE_ERROR_MSG(0x12a, 
1361                                "Dropping message from %s: no buffers free. "
1362                                "%s is running an old version of LNET that may "
1363                                "deadlock if messages wait for buffers)\n",
1364                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1365                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1366                 return -EDEADLK;
1367         }
1368         
1369         *new_private = private;
1370         return 0;
1371 }
1372
1373 int
1374 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1375              int delayed, unsigned int niov,
1376              struct iovec *iov, lnet_kiov_t *kiov,
1377              unsigned int offset, unsigned int mlen, unsigned int rlen)
1378 {
1379         kib_rx_t    *rx = private;
1380         kib_msg_t   *rxmsg = rx->rx_msg;
1381         int          msg_nob;
1382         int          rc = 0;
1383         
1384         LASSERT (mlen <= rlen);
1385         LASSERT (!in_interrupt ());
1386         /* Either all pages or all vaddrs */
1387         LASSERT (!(kiov != NULL && iov != NULL));
1388
1389         switch (rxmsg->ibm_type) {
1390         default:
1391                 LBUG();
1392
1393         case IBNAL_MSG_IMMEDIATE:
1394                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1395                 if (msg_nob > rx->rx_nob) {
1396                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1397                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1398                                 msg_nob, rx->rx_nob);
1399                         rc = -EPROTO;
1400                         break;
1401                 }
1402
1403                 if (kiov != NULL)
1404                         lnet_copy_flat2kiov(
1405                                 niov, kiov, offset, 
1406                                 IBNAL_MSG_SIZE, rxmsg,
1407                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1408                                 mlen);
1409                 else
1410                         lnet_copy_flat2iov(
1411                                 niov, iov, offset,
1412                                 IBNAL_MSG_SIZE, rxmsg,
1413                                 offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1414                                 mlen);
1415
1416                 lnet_finalize (ni, lntmsg, 0);
1417                 break;
1418
1419         case IBNAL_MSG_GET_RDMA:
1420                 if (lntmsg != NULL) {
1421                         /* GET matched: RDMA lntmsg's payload */
1422                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1423                                                  rx, lntmsg, 
1424                                                  lntmsg->msg_niov, 
1425                                                  lntmsg->msg_iov, 
1426                                                  lntmsg->msg_kiov,
1427                                                  lntmsg->msg_offset, 
1428                                                  lntmsg->msg_len);
1429                 } else {
1430                         /* GET didn't match anything */
1431                         kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
1432                                                   rx, NULL, 0, NULL, NULL, 0, 0);
1433                 }
1434                 break;
1435
1436         case IBNAL_MSG_PUT_RDMA:
1437                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg,
1438                                           niov, iov, kiov, offset, mlen);
1439                 break;
1440         }
1441
1442         kibnal_post_rx(rx, 1, 0);
1443         return rc;
1444 }
1445
1446 int
1447 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1448 {
1449         long    pid = kernel_thread (fn, arg, 0);
1450
1451         if (pid < 0)
1452                 return ((int)pid);
1453
1454         atomic_inc (&kibnal_data.kib_nthreads);
1455         return (0);
1456 }
1457
1458 void
1459 kibnal_thread_fini (void)
1460 {
1461         atomic_dec (&kibnal_data.kib_nthreads);
1462 }
1463
1464 void
1465 kibnal_peer_alive (kib_peer_t *peer)
1466 {
1467         /* This is racy, but everyone's only writing cfs_time_current() */
1468         peer->ibp_last_alive = cfs_time_current();
1469         mb();
1470 }
1471
1472 void
1473 kibnal_peer_notify (kib_peer_t *peer)
1474 {
1475         time_t        last_alive = 0;
1476         int           error = 0;
1477         unsigned long flags;
1478         
1479         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1480
1481         if (list_empty(&peer->ibp_conns) &&
1482             peer->ibp_accepting == 0 &&
1483             peer->ibp_connecting == 0 &&
1484             peer->ibp_error != 0) {
1485                 error = peer->ibp_error;
1486                 peer->ibp_error = 0;
1487                 last_alive = cfs_time_current_sec() -
1488                              cfs_duration_sec(cfs_time_current() -
1489                                               peer->ibp_last_alive);
1490         }
1491         
1492         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1493         
1494         if (error != 0)
1495                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1496 }
1497
1498 void
1499 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1500 {
1501         /* This just does the immmediate housekeeping, and schedules the
1502          * connection for the reaper to finish off.
1503          * Caller holds kib_global_lock exclusively in irq context */
1504         kib_peer_t   *peer = conn->ibc_peer;
1505
1506         CDEBUG (error == 0 ? D_NET : D_NETERROR,
1507                 "closing conn to %s: error %d\n", 
1508                 libcfs_nid2str(peer->ibp_nid), error);
1509         
1510         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1511                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1512
1513         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1514                 /* kib_reaper_conns takes ibc_list's ref */
1515                 list_del (&conn->ibc_list);
1516         } else {
1517                 /* new ref for kib_reaper_conns */
1518                 kibnal_conn_addref(conn);
1519         }
1520         
1521         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1522                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1523                     kibnal_peer_active(peer))     /* still in peer table */
1524                         kibnal_unlink_peer_locked (peer);
1525
1526                 peer->ibp_error = error; /* set/clear error on last conn */
1527         }
1528
1529         conn->ibc_state = IBNAL_CONN_DEATHROW;
1530
1531         /* Schedule conn for closing/destruction */
1532         spin_lock (&kibnal_data.kib_reaper_lock);
1533
1534         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1535         wake_up (&kibnal_data.kib_reaper_waitq);
1536                 
1537         spin_unlock (&kibnal_data.kib_reaper_lock);
1538 }
1539
1540 int
1541 kibnal_close_conn (kib_conn_t *conn, int why)
1542 {
1543         unsigned long     flags;
1544         int               count = 0;
1545
1546         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1547
1548         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1549         
1550         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1551                 count = 1;
1552                 kibnal_close_conn_locked (conn, why);
1553         }
1554         
1555         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1556         return (count);
1557 }
1558
1559 void
1560 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
1561 {
1562         LIST_HEAD        (zombies);
1563         unsigned long     flags;
1564
1565         LASSERT(error != 0);
1566
1567         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1568
1569         if (active) {
1570                 LASSERT (peer->ibp_connecting != 0);
1571                 peer->ibp_connecting--;
1572         } else {
1573                 LASSERT (peer->ibp_accepting != 0);
1574                 peer->ibp_accepting--;
1575         }
1576
1577         if (peer->ibp_connecting != 0 ||
1578             peer->ibp_accepting != 0) {
1579                 /* another connection attempt under way... */
1580                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1581                 return;
1582         }
1583
1584         if (list_empty(&peer->ibp_conns)) {
1585                 /* Say when active connection can be re-attempted */
1586                 peer->ibp_reconnect_interval *= 2;
1587                 peer->ibp_reconnect_interval =
1588                         MAX(peer->ibp_reconnect_interval,
1589                             *kibnal_tunables.kib_min_reconnect_interval);
1590                 peer->ibp_reconnect_interval =
1591                         MIN(peer->ibp_reconnect_interval,
1592                             *kibnal_tunables.kib_max_reconnect_interval);
1593                 
1594                 peer->ibp_reconnect_time = jiffies + 
1595                                            peer->ibp_reconnect_interval * HZ;
1596         
1597                 /* Take peer's blocked transmits; I'll complete
1598                  * them with error */
1599                 list_add(&zombies, &peer->ibp_tx_queue);
1600                 list_del_init(&peer->ibp_tx_queue);
1601                 
1602                 if (kibnal_peer_active(peer) &&
1603                     (peer->ibp_persistence == 0)) {
1604                         /* failed connection attempt on non-persistent peer */
1605                         kibnal_unlink_peer_locked (peer);
1606                 }
1607
1608                 peer->ibp_error = error;
1609         } else {
1610                 /* Can't have blocked transmits if there are connections */
1611                 LASSERT (list_empty(&peer->ibp_tx_queue));
1612         }
1613         
1614         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1615
1616         kibnal_peer_notify(peer);
1617         
1618         if (!list_empty (&zombies))
1619                 CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
1620                         libcfs_nid2str(peer->ibp_nid));
1621
1622         kibnal_txlist_done(&zombies, -EHOSTUNREACH);
1623 }
1624
1625 void
1626 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1627 {
1628         int               state = conn->ibc_state;
1629         kib_peer_t       *peer = conn->ibc_peer;
1630         kib_tx_t         *tx;
1631         unsigned long     flags;
1632         int               rc;
1633         int               i;
1634
1635         if (conn->ibc_connreq != NULL) {
1636                 LIBCFS_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1637                 conn->ibc_connreq = NULL;
1638         }
1639
1640         switch (state) {
1641         case IBNAL_CONN_CONNECTING:
1642                 /* conn has a CM comm_id */
1643                 if (status == 0) {
1644                         /* Install common (active/passive) callback for
1645                          * disconnect/idle notification */
1646                         rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1647                                                   kibnal_conn_callback,
1648                                                   conn);
1649                         LASSERT (rc == 0);
1650                 } else {
1651                         /* LASSERT (no more CM callbacks) */
1652                         rc = tsIbCmCallbackModify(conn->ibc_comm_id,
1653                                                   kibnal_bad_conn_callback,
1654                                                   conn);
1655                         LASSERT (rc == 0);
1656                 }
1657                 break;
1658                 
1659         case IBNAL_CONN_INIT_QP:
1660                 LASSERT (status != 0);
1661                 break;
1662                 
1663         default:
1664                 LBUG();
1665         }
1666         
1667         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1668
1669         if (active)
1670                 LASSERT (peer->ibp_connecting != 0);
1671         else
1672                 LASSERT (peer->ibp_accepting != 0);
1673         
1674         if (status == 0 &&                      /* connection established */
1675             kibnal_peer_active(peer)) {         /* peer not deleted */
1676
1677                 if (active)
1678                         peer->ibp_connecting--;
1679                 else
1680                         peer->ibp_accepting--;
1681
1682                 conn->ibc_last_send = jiffies;
1683                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1684                 kibnal_peer_alive(peer);
1685
1686                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1687                  * the IB_CM_IDLE callback */
1688                 kibnal_conn_addref(conn);
1689                 list_add (&conn->ibc_list, &peer->ibp_conns);
1690
1691                 peer->ibp_reconnect_interval = 0; /* OK to reconnect at any time */
1692
1693                 /* post blocked sends to the new connection */
1694                 spin_lock (&conn->ibc_lock);
1695                 
1696                 while (!list_empty (&peer->ibp_tx_queue)) {
1697                         tx = list_entry (peer->ibp_tx_queue.next, 
1698                                          kib_tx_t, tx_list);
1699                         
1700                         list_del (&tx->tx_list);
1701
1702                         kibnal_queue_tx_locked (tx, conn);
1703                 }
1704                 
1705                 spin_unlock (&conn->ibc_lock);
1706
1707                 /* Nuke any dangling conns from a different peer instance... */
1708                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1709                                                  conn->ibc_incarnation);
1710
1711                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1712
1713                 /* queue up all the receives */
1714                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1715                         /* +1 ref for rx desc */
1716                         kibnal_conn_addref(conn);
1717
1718                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1719                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1720                                conn->ibc_rxs[i].rx_vaddr);
1721
1722                         kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
1723                 }
1724
1725                 kibnal_check_sends (conn);
1726                 return;
1727         }
1728
1729         if (status == 0) {
1730                 /* connection established, but peer was deleted.  Schedule for
1731                  * reaper to cm_disconnect... */
1732                 status = -ECONNABORTED;
1733                 kibnal_close_conn_locked (conn, status);
1734         } else {
1735                 /* just waiting for refs to drain */
1736                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1737         } 
1738
1739         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1740
1741         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1742 }
1743
1744 int
1745 kibnal_accept_connreq (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1746                        kib_msg_t *msg, int nob)
1747 {
1748         kib_conn_t    *conn;
1749         kib_peer_t    *peer;
1750         kib_peer_t    *peer2;
1751         unsigned long  flags;
1752         int            rc;
1753
1754         rc = kibnal_unpack_msg(msg, 0, nob);
1755         if (rc != 0) {
1756                 CERROR("Can't unpack connreq msg: %d\n", rc);
1757                 return -EPROTO;
1758         }
1759
1760         CDEBUG(D_NET, "connreq from %s\n", libcfs_nid2str(msg->ibm_srcnid));
1761
1762         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1763                 CERROR("Unexpected connreq msg type: %x from %s\n",
1764                        msg->ibm_type, libcfs_nid2str(msg->ibm_srcnid));
1765                 return -EPROTO;
1766         }
1767                 
1768         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1769                 CERROR("Can't accept %s: bad queue depth %d (%d expected)\n",
1770                        libcfs_nid2str(msg->ibm_srcnid), 
1771                        msg->ibm_u.connparams.ibcp_queue_depth, 
1772                        IBNAL_MSG_QUEUE_SIZE);
1773                 return (-EPROTO);
1774         }
1775         
1776         conn = kibnal_create_conn();
1777         if (conn == NULL)
1778                 return (-ENOMEM);
1779
1780         /* assume 'nid' is a new peer */
1781         rc = kibnal_create_peer(&peer, msg->ibm_srcnid);
1782         if (rc != 0) {
1783                 kibnal_conn_decref(conn);
1784                 return (-ENOMEM);
1785         }
1786         
1787         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1788
1789         if (kibnal_data.kib_nonewpeers) {
1790                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1791                 
1792                 CERROR ("Shutdown has started, drop connreq from %s\n",
1793                         libcfs_nid2str(msg->ibm_srcnid));
1794                 kibnal_conn_decref(conn);
1795                 kibnal_peer_decref(peer);
1796                 return -ESHUTDOWN;
1797         }
1798
1799         /* Check I'm the same instance that gave the connection parameters.  
1800          * NB If my incarnation changes after this, the peer will get nuked and
1801          * we'll spot that when the connection is finally added into the peer's
1802          * connlist */
1803         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
1804                                      msg->ibm_dstnid) ||
1805             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1806                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1807                 
1808                 CERROR("Stale connection params from %s\n",
1809                        libcfs_nid2str(msg->ibm_srcnid));
1810                 kibnal_conn_decref(conn);
1811                 kibnal_peer_decref(peer);
1812                 return -ESTALE;
1813         }
1814
1815         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1816         if (peer2 == NULL) {
1817                 /* Brand new peer */
1818                 LASSERT (peer->ibp_accepting == 0);
1819
1820                 /* peer table takes my ref on peer */
1821                 list_add_tail (&peer->ibp_list,
1822                                kibnal_nid2peerlist(msg->ibm_srcnid));
1823         } else {
1824                 /* tie-break connection race in favour of the higher NID */                
1825                 if (peer2->ibp_connecting != 0 &&
1826                     msg->ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
1827                         write_unlock_irqrestore(&kibnal_data.kib_global_lock,
1828                                                 flags);
1829                         CWARN("Conn race %s\n",
1830                               libcfs_nid2str(peer2->ibp_nid));
1831
1832                         kibnal_conn_decref(conn);
1833                         kibnal_peer_decref(peer);
1834                         return -EALREADY;
1835                 }
1836
1837                 kibnal_peer_decref(peer);
1838                 peer = peer2;
1839         }
1840
1841         /* +1 ref for conn */
1842         kibnal_peer_addref(peer);
1843         peer->ibp_accepting++;
1844
1845         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1846
1847         conn->ibc_peer = peer;
1848         conn->ibc_state = IBNAL_CONN_CONNECTING;
1849         conn->ibc_comm_id = cid;
1850         conn->ibc_incarnation = msg->ibm_srcstamp;
1851         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1852         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
1853         conn->ibc_version = msg->ibm_version;
1854
1855         *connp = conn;
1856         return (0);
1857 }
1858
1859 tTS_IB_CM_CALLBACK_RETURN
1860 kibnal_bad_conn_callback (tTS_IB_CM_EVENT event,
1861                           tTS_IB_CM_COMM_ID cid,
1862                           void *param,
1863                           void *arg)
1864 {
1865         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1866         LBUG ();
1867         return TS_IB_CM_CALLBACK_PROCEED;
1868 }
1869
1870 void
1871 kibnal_abort_txs (kib_conn_t *conn, struct list_head *txs)
1872 {
1873         LIST_HEAD        (zombies); 
1874         struct list_head *tmp;
1875         struct list_head *nxt;
1876         kib_tx_t         *tx;
1877         unsigned long     flags;
1878
1879         spin_lock_irqsave (&conn->ibc_lock, flags);
1880
1881         list_for_each_safe (tmp, nxt, txs) {
1882                 tx = list_entry (tmp, kib_tx_t, tx_list);
1883
1884                 if (txs == &conn->ibc_active_txs) {
1885                         LASSERT (tx->tx_passive_rdma ||
1886                                  !tx->tx_passive_rdma_wait);
1887
1888                         LASSERT (tx->tx_passive_rdma_wait ||
1889                                  tx->tx_sending != 0);
1890                 } else {
1891                         LASSERT (!tx->tx_passive_rdma_wait);
1892                         LASSERT (tx->tx_sending == 0);
1893                 }
1894
1895                 tx->tx_status = -ECONNABORTED;
1896                 tx->tx_passive_rdma_wait = 0;
1897
1898                 if (tx->tx_sending == 0) {
1899                         list_del (&tx->tx_list);
1900                         list_add (&tx->tx_list, &zombies);
1901                 }
1902         }
1903         
1904         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1905
1906         kibnal_txlist_done (&zombies, -ECONNABORTED);
1907 }
1908
1909 tTS_IB_CM_CALLBACK_RETURN
1910 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1911                       tTS_IB_CM_COMM_ID cid,
1912                       void *param,
1913                       void *arg)
1914 {
1915         kib_conn_t       *conn = arg;
1916         int               rc;
1917
1918         /* Established Connection Notifier */
1919
1920         switch (event) {
1921         default:
1922                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
1923                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1924                 kibnal_close_conn (conn, -ECONNABORTED);
1925                 break;
1926                 
1927         case TS_IB_CM_DISCONNECTED:
1928                 CDEBUG(D_NETERROR, "Connection %p -> %s DISCONNECTED.\n",
1929                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1930                 kibnal_close_conn (conn, 0);
1931                 break;
1932
1933         case TS_IB_CM_IDLE:
1934                 CDEBUG(D_NET, "Connection %p -> %s IDLE.\n",
1935                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1936
1937                 /* LASSERT (no further callbacks) */
1938                 rc = tsIbCmCallbackModify(cid, kibnal_bad_conn_callback, conn);
1939                 LASSERT (rc == 0);
1940
1941                 /* NB we wait until the connection has closed before
1942                  * completing outstanding passive RDMAs so we can be sure
1943                  * the network can't touch the mapped memory any more. */
1944
1945                 kibnal_abort_txs(conn, &conn->ibc_tx_queue);
1946                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
1947                 kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
1948                 kibnal_abort_txs(conn, &conn->ibc_active_txs);
1949                 
1950                 kibnal_conn_decref(conn);        /* Lose CM's ref */
1951                 break;
1952         }
1953
1954         return TS_IB_CM_CALLBACK_PROCEED;
1955 }
1956
1957 tTS_IB_CM_CALLBACK_RETURN
1958 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1959                               tTS_IB_CM_COMM_ID cid,
1960                               void *param,
1961                               void *arg)
1962 {
1963         kib_conn_t  *conn = arg;
1964         int          rc;
1965         
1966         switch (event) {
1967         default:
1968                 if (conn == NULL) {
1969                         /* no connection yet */
1970                         CERROR ("Unexpected event: %d\n", event);
1971                         return TS_IB_CM_CALLBACK_ABORT;
1972                 }
1973                 
1974                 CERROR ("%s event %p -> %s: %d\n",
1975                         (event == TS_IB_CM_IDLE) ? "IDLE" : "Unexpected",
1976                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
1977                 kibnal_connreq_done(conn, 0, -ECONNABORTED);
1978                 kibnal_conn_decref(conn); /* drop CM's ref */
1979                 return TS_IB_CM_CALLBACK_ABORT;
1980                 
1981         case TS_IB_CM_REQ_RECEIVED: {
1982                 struct ib_cm_req_received_param *req = param;
1983                 kib_msg_t                       *msg = req->remote_private_data;
1984
1985                 LASSERT (conn == NULL);
1986
1987                 /* Don't really know srcnid until successful unpack */
1988                 CDEBUG(D_NET, "REQ from ?%s?\n", libcfs_nid2str(msg->ibm_srcnid));
1989
1990                 rc = kibnal_accept_connreq(&conn, cid, msg, 
1991                                            req->remote_private_data_len);
1992                 if (rc != 0) {
1993                         CERROR ("Can't accept ?%s?: %d\n",
1994                                 libcfs_nid2str(msg->ibm_srcnid), rc);
1995                         return TS_IB_CM_CALLBACK_ABORT;
1996                 }
1997
1998                 /* update 'arg' for next callback */
1999                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
2000                 LASSERT (rc == 0);
2001
2002                 msg = req->accept_param.reply_private_data;
2003                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
2004                                 sizeof(msg->ibm_u.connparams));
2005
2006                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2007
2008                 kibnal_pack_msg(msg, conn->ibc_version, 0, 
2009                                 conn->ibc_peer->ibp_nid, 
2010                                 conn->ibc_incarnation);
2011
2012                 req->accept_param.qp                     = conn->ibc_qp;
2013                 req->accept_param.reply_private_data_len = msg->ibm_nob;
2014                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
2015                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
2016                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
2017                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
2018
2019                 CDEBUG(D_NET, "Proceeding\n");
2020                 return TS_IB_CM_CALLBACK_PROCEED; /* CM takes my ref on conn */
2021         }
2022
2023         case TS_IB_CM_ESTABLISHED:
2024                 LASSERT (conn != NULL);
2025                 CWARN("Connection %p -> %s ESTABLISHED.\n",
2026                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2027
2028                 kibnal_connreq_done(conn, 0, 0);
2029                 return TS_IB_CM_CALLBACK_PROCEED;
2030         }
2031 }
2032
2033 tTS_IB_CM_CALLBACK_RETURN
2034 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
2035                              tTS_IB_CM_COMM_ID cid,
2036                              void *param,
2037                              void *arg)
2038 {
2039         kib_conn_t    *conn = arg;
2040         unsigned long  flags;
2041
2042         switch (event) {
2043         case TS_IB_CM_REP_RECEIVED: {
2044                 struct ib_cm_rep_received_param *rep = param;
2045                 kib_msg_t                       *msg = rep->remote_private_data;
2046                 int                              nob = rep->remote_private_data_len;
2047                 int                              rc;
2048
2049                 rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
2050                 if (rc != 0) {
2051                         CERROR ("Error %d unpacking conn ack from %s\n",
2052                                 rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2053                         kibnal_connreq_done(conn, 1, rc);
2054                         kibnal_conn_decref(conn); /* drop CM's ref */
2055                         return TS_IB_CM_CALLBACK_ABORT;
2056                 }
2057
2058                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2059                         CERROR ("Unexpected conn ack type %d from %s\n",
2060                                 msg->ibm_type, 
2061                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2062                         kibnal_connreq_done(conn, 1, -EPROTO);
2063                         kibnal_conn_decref(conn); /* drop CM's ref */
2064                         return TS_IB_CM_CALLBACK_ABORT;
2065                 }
2066
2067                 if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
2068                                              msg->ibm_srcnid) ||
2069                     !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
2070                                              msg->ibm_dstnid) ||
2071                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2072                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2073                         CERROR("Stale conn ack from %s\n",
2074                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2075                         kibnal_connreq_done(conn, 1, -ESTALE);
2076                         kibnal_conn_decref(conn); /* drop CM's ref */
2077                         return TS_IB_CM_CALLBACK_ABORT;
2078                 }
2079
2080                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2081                         CERROR ("Bad queue depth %d from %s\n",
2082                                 msg->ibm_u.connparams.ibcp_queue_depth,
2083                                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
2084                         kibnal_connreq_done(conn, 1, -EPROTO);
2085                         kibnal_conn_decref(conn); /* drop CM's ref */
2086                         return TS_IB_CM_CALLBACK_ABORT;
2087                 }
2088                                 
2089                 CDEBUG(D_NET, "Connection %p -> %s REP_RECEIVED.\n",
2090                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2091
2092                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2093                 conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2094                 return TS_IB_CM_CALLBACK_PROCEED;
2095         }
2096
2097         case TS_IB_CM_ESTABLISHED:
2098                 CWARN("Connection %p -> %s ESTABLISHED\n",
2099                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2100
2101                 kibnal_connreq_done(conn, 1, 0);
2102                 return TS_IB_CM_CALLBACK_PROCEED;
2103
2104         case TS_IB_CM_IDLE:
2105                 CDEBUG(D_NETERROR, "Connection %p -> %s IDLE\n",
2106                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2107                 /* I assume this connection attempt was rejected because the
2108                  * peer found a stale QP; I'll just try again */
2109                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2110                 kibnal_schedule_active_connect_locked(conn->ibc_peer);
2111                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2112
2113                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2114                 kibnal_conn_decref(conn); /* drop CM's ref */
2115                 return TS_IB_CM_CALLBACK_ABORT;
2116
2117         default:
2118                 CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
2119                        conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
2120                 kibnal_connreq_done(conn, 1, -ECONNABORTED);
2121                 kibnal_conn_decref(conn); /* drop CM's ref */
2122                 return TS_IB_CM_CALLBACK_ABORT;
2123         }
2124 }
2125
2126 int
2127 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2128                           struct ib_path_record *resp, int remaining,
2129                           void *arg)
2130 {
2131         kib_conn_t *conn = arg;
2132         kib_peer_t *peer = conn->ibc_peer;
2133         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2134
2135         if (status != 0) {
2136                 CDEBUG (D_NETERROR, "Pathreq %p -> %s failed: %d\n",
2137                         conn, libcfs_nid2str(peer->ibp_nid), status);
2138                 kibnal_connreq_done(conn, 1, status);
2139                 kibnal_conn_decref(conn); /* drop callback's ref */
2140                 return 1;    /* non-zero prevents further callbacks */
2141         }
2142
2143         conn->ibc_connreq->cr_path = *resp;
2144
2145         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2146         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2147         kibnal_pack_msg(msg, conn->ibc_version, 0, 
2148                         peer->ibp_nid, conn->ibc_incarnation);
2149
2150         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2151                 .qp                   = conn->ibc_qp,
2152                 .req_private_data     = msg,
2153                 .req_private_data_len = msg->ibm_nob,
2154                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2155                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2156                 .retry_count          = IBNAL_RETRY,
2157                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2158                 .cm_response_timeout  = *kibnal_tunables.kib_timeout,
2159                 .max_cm_retries       = IBNAL_CM_RETRY,
2160                 .flow_control         = IBNAL_FLOW_CONTROL,
2161         };
2162
2163         /* XXX set timeout just like SDP!!!*/
2164         conn->ibc_connreq->cr_path.packet_life = 13;
2165         
2166         /* Flag I'm getting involved with the CM... */
2167         conn->ibc_state = IBNAL_CONN_CONNECTING;
2168
2169         CDEBUG(D_NET, "Connecting to, service id "LPX64", on %s\n",
2170                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 
2171                libcfs_nid2str(peer->ibp_nid));
2172
2173         /* kibnal_connect_callback gets my conn ref */
2174         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2175                                 &conn->ibc_connreq->cr_path, NULL,
2176                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2177                                 kibnal_active_conn_callback, conn,
2178                                 &conn->ibc_comm_id);
2179         if (status != 0) {
2180                 CERROR ("Connect %p -> %s failed: %d\n",
2181                         conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
2182                 /* Back out state change: I've not got a CM comm_id yet... */
2183                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2184                 kibnal_connreq_done(conn, 1, status);
2185                 kibnal_conn_decref(conn); /* Drop callback's ref */
2186         }
2187         
2188         return 1;    /* non-zero to prevent further callbacks */
2189 }
2190
2191 void
2192 kibnal_connect_peer (kib_peer_t *peer)
2193 {
2194         kib_conn_t  *conn;
2195         int          rc;
2196
2197         conn = kibnal_create_conn();
2198         if (conn == NULL) {
2199                 CERROR ("Can't allocate conn\n");
2200                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2201                 return;
2202         }
2203
2204         conn->ibc_peer = peer;
2205         kibnal_peer_addref(peer);
2206
2207         LIBCFS_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2208         if (conn->ibc_connreq == NULL) {
2209                 CERROR ("Can't allocate connreq\n");
2210                 kibnal_connreq_done(conn, 1, -ENOMEM);
2211                 kibnal_conn_decref(conn); /* drop my ref */
2212                 return;
2213         }
2214
2215         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2216
2217         rc = kibnal_make_svcqry(conn);
2218         if (rc != 0) {
2219                 kibnal_connreq_done (conn, 1, rc);
2220                 kibnal_conn_decref(conn); /* drop my ref */
2221                 return;
2222         }
2223
2224         rc = ib_cached_gid_get(kibnal_data.kib_device,
2225                                kibnal_data.kib_port, 0,
2226                                conn->ibc_connreq->cr_gid);
2227         LASSERT (rc == 0);
2228
2229         /* kibnal_pathreq_callback gets my conn ref */
2230         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2231                                     kibnal_data.kib_port,
2232                                     conn->ibc_connreq->cr_gid,
2233                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2234                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2235                                     0,
2236                                     *kibnal_tunables.kib_timeout * HZ,
2237                                     0,
2238                                     kibnal_pathreq_callback, conn, 
2239                                     &conn->ibc_connreq->cr_tid);
2240         if (rc == 0)
2241                 return; /* callback now has my ref on conn */
2242
2243         CERROR ("Path record request %p -> %s failed: %d\n",
2244                 conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
2245         kibnal_connreq_done(conn, 1, rc);
2246         kibnal_conn_decref(conn); /* drop my ref */
2247 }
2248
2249 int
2250 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
2251 {
2252         kib_tx_t          *tx;
2253         struct list_head  *ttmp;
2254         unsigned long      flags;
2255         int                timed_out = 0;
2256
2257         spin_lock_irqsave (&conn->ibc_lock, flags);
2258
2259         list_for_each (ttmp, txs) {
2260                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2261
2262                 if (txs == &conn->ibc_active_txs) {
2263                         LASSERT (tx->tx_passive_rdma ||
2264                                  !tx->tx_passive_rdma_wait);
2265
2266                         LASSERT (tx->tx_passive_rdma_wait ||
2267                                  tx->tx_sending != 0);
2268                 } else {
2269                         LASSERT (!tx->tx_passive_rdma_wait);
2270                         LASSERT (tx->tx_sending == 0);
2271                 }
2272                 
2273                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2274                         timed_out = 1;
2275                         break;
2276                 }
2277         }
2278
2279         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2280         return timed_out;
2281 }
2282
2283 int
2284 kibnal_conn_timed_out (kib_conn_t *conn)
2285 {
2286         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
2287                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
2288                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
2289                 kibnal_check_txs(conn, &conn->ibc_active_txs);
2290 }
2291
2292 void
2293 kibnal_check_conns (int idx)
2294 {
2295         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2296         struct list_head  *ptmp;
2297         kib_peer_t        *peer;
2298         kib_conn_t        *conn;
2299         struct list_head  *ctmp;
2300         unsigned long      flags;
2301
2302  again:
2303         /* NB. We expect to have a look at all the peers and not find any
2304          * rdmas to time out, so we just use a shared lock while we
2305          * take a look... */
2306         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2307
2308         list_for_each (ptmp, peers) {
2309                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2310
2311                 list_for_each (ctmp, &peer->ibp_conns) {
2312                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2313
2314                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2315
2316
2317                         /* In case we have enough credits to return via a
2318                          * NOOP, but there were no non-blocking tx descs
2319                          * free to do it last time... */
2320                         kibnal_check_sends(conn);
2321
2322                         if (!kibnal_conn_timed_out(conn))
2323                                 continue;
2324                         
2325                         kibnal_conn_addref(conn);
2326
2327                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2328                                                flags);
2329
2330                         CERROR("Timed out RDMA with %s\n",
2331                                libcfs_nid2str(peer->ibp_nid));
2332
2333                         kibnal_close_conn (conn, -ETIMEDOUT);
2334                         kibnal_conn_decref(conn);
2335
2336                         /* start again now I've dropped the lock */
2337                         goto again;
2338                 }
2339         }
2340
2341         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2342 }
2343
2344 void
2345 kibnal_terminate_conn (kib_conn_t *conn)
2346 {
2347         int           rc;
2348
2349         CDEBUG(D_NET, "conn %p\n", conn);
2350         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2351         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2352
2353         rc = ib_cm_disconnect (conn->ibc_comm_id);
2354         if (rc != 0)
2355                 CERROR ("Error %d disconnecting conn %p -> %s\n",
2356                         rc, conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2357
2358         kibnal_peer_notify(conn->ibc_peer);
2359 }
2360
2361 int
2362 kibnal_reaper (void *arg)
2363 {
2364         wait_queue_t       wait;
2365         unsigned long      flags;
2366         kib_conn_t        *conn;
2367         int                timeout;
2368         int                i;
2369         int                peer_index = 0;
2370         unsigned long      deadline = jiffies;
2371         
2372         cfs_daemonize ("kibnal_reaper");
2373         cfs_block_allsigs ();
2374
2375         init_waitqueue_entry (&wait, current);
2376
2377         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2378
2379         while (!kibnal_data.kib_shutdown) {
2380                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2381                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2382                                            kib_conn_t, ibc_list);
2383                         list_del (&conn->ibc_list);
2384                         
2385                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2386
2387                         switch (conn->ibc_state) {
2388                         case IBNAL_CONN_DEATHROW:
2389                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2390                                 /* Disconnect: conn becomes a zombie in the
2391                                  * callback and last ref reschedules it
2392                                  * here... */
2393                                 kibnal_terminate_conn(conn);
2394                                 kibnal_conn_decref(conn);
2395                                 break;
2396
2397                         case IBNAL_CONN_INIT_QP:
2398                         case IBNAL_CONN_ZOMBIE:
2399                                 kibnal_destroy_conn (conn);
2400                                 break;
2401                                 
2402                         default:
2403                                 CERROR ("Bad conn %p state: %d\n",
2404                                         conn, conn->ibc_state);
2405                                 LBUG();
2406                         }
2407
2408                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2409                         continue;
2410                 }
2411
2412                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2413
2414                 /* careful with the jiffy wrap... */
2415                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2416                         const int n = 4;
2417                         const int p = 1;
2418                         int       chunk = kibnal_data.kib_peer_hash_size;
2419                         
2420                         /* Time to check for RDMA timeouts on a few more
2421                          * peers: I do checks every 'p' seconds on a
2422                          * proportion of the peer table and I need to check
2423                          * every connection 'n' times within a timeout
2424                          * interval, to ensure I detect a timeout on any
2425                          * connection within (n+1)/n times the timeout
2426                          * interval. */
2427
2428                         if (*kibnal_tunables.kib_timeout > n * p)
2429                                 chunk = (chunk * n * p) / 
2430                                         *kibnal_tunables.kib_timeout;
2431                         if (chunk == 0)
2432                                 chunk = 1;
2433
2434                         for (i = 0; i < chunk; i++) {
2435                                 kibnal_check_conns (peer_index);
2436                                 peer_index = (peer_index + 1) % 
2437                                              kibnal_data.kib_peer_hash_size;
2438                         }
2439
2440                         deadline += p * HZ;
2441                 }
2442
2443                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2444
2445                 set_current_state (TASK_INTERRUPTIBLE);
2446                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2447
2448                 schedule_timeout (timeout);
2449
2450                 set_current_state (TASK_RUNNING);
2451                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2452
2453                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2454         }
2455
2456         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2457
2458         kibnal_thread_fini ();
2459         return (0);
2460 }
2461
2462 int
2463 kibnal_connd (void *arg)
2464 {
2465         long               id = (long)arg;
2466         char               name[16];
2467         wait_queue_t       wait;
2468         unsigned long      flags;
2469         kib_peer_t        *peer;
2470         kib_acceptsock_t  *as;
2471         int                did_something;
2472
2473         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2474         cfs_daemonize(name);
2475         cfs_block_allsigs();
2476
2477         init_waitqueue_entry (&wait, current);
2478
2479         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2480
2481         while (!kibnal_data.kib_shutdown) {
2482                 did_something = 0;
2483
2484                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2485                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2486                                          kib_acceptsock_t, ibas_list);
2487                         list_del (&as->ibas_list);
2488                         
2489                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2490
2491                         kibnal_handle_svcqry(as->ibas_sock);
2492                         kibnal_free_acceptsock(as);
2493                         
2494                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2495                         did_something = 1;
2496                 }
2497                         
2498                 /* Only handle an outgoing connection request if there is someone left
2499                  * to handle an incoming svcqry */
2500                 if (!list_empty (&kibnal_data.kib_connd_peers) &&
2501                     ((kibnal_data.kib_connd_connecting + 1) < 
2502                      *kibnal_tunables.kib_n_connd)) {
2503                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2504                                            kib_peer_t, ibp_connd_list);
2505                         
2506                         list_del_init (&peer->ibp_connd_list);
2507                         kibnal_data.kib_connd_connecting++;
2508                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2509
2510                         kibnal_connect_peer (peer);
2511                         kibnal_peer_decref(peer);
2512
2513                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2514                         did_something = 1;
2515                         kibnal_data.kib_connd_connecting--;
2516                 }
2517
2518                 if (did_something)
2519                         continue;
2520
2521                 set_current_state (TASK_INTERRUPTIBLE);
2522                 add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
2523
2524                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2525
2526                 schedule();
2527
2528                 set_current_state (TASK_RUNNING);
2529                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2530
2531                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2532         }
2533
2534         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2535
2536         kibnal_thread_fini ();
2537         return (0);
2538 }
2539
2540 int
2541 kibnal_scheduler(void *arg)
2542 {
2543         long            id = (long)arg;
2544         char            name[16];
2545         kib_rx_t       *rx;
2546         kib_tx_t       *tx;
2547         unsigned long   flags;
2548         int             rc;
2549         int             counter = 0;
2550         int             did_something;
2551
2552         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2553         cfs_daemonize(name);
2554         cfs_block_allsigs();
2555
2556         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2557
2558         while (!kibnal_data.kib_shutdown) {
2559                 did_something = 0;
2560
2561                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2562                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2563                                         kib_tx_t, tx_list);
2564                         list_del(&tx->tx_list);
2565                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2566                                                flags);
2567                         kibnal_tx_done(tx);
2568
2569                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2570                                           flags);
2571                 }
2572
2573                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2574                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2575                                         kib_rx_t, rx_list);
2576                         list_del(&rx->rx_list);
2577                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2578                                                flags);
2579
2580                         kibnal_rx(rx);
2581
2582                         did_something = 1;
2583                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2584                                           flags);
2585                 }
2586
2587                 /* nothing to do or hogging CPU */
2588                 if (!did_something || counter++ == IBNAL_RESCHED) {
2589                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2590                                                flags);
2591                         counter = 0;
2592
2593                         if (!did_something) {
2594                                 rc = wait_event_interruptible_exclusive(
2595                                         kibnal_data.kib_sched_waitq,
2596                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2597                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2598                                         kibnal_data.kib_shutdown);
2599                         } else {
2600                                 our_cond_resched();
2601                         }
2602
2603                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2604                                           flags);
2605                 }
2606         }
2607
2608         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2609
2610         kibnal_thread_fini();
2611         return (0);
2612 }