Whamcloud - gitweb
7226ff9d976e726bf86ad6f81a074531acf08c35
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openibnal.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
47         unsigned long    flags;
48         int              i;
49         int              rc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         switch (tx->tx_mapped) {
55         default:
56                 LBUG();
57
58         case KIB_TX_UNMAPPED:
59                 break;
60                 
61         case KIB_TX_MAPPED:
62                 if (in_interrupt()) {
63                         /* can't deregister memory in IRQ context... */
64                         kibnal_schedule_tx_done(tx);
65                         return;
66                 }
67                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
68                 LASSERT (rc == 0);
69                 tx->tx_mapped = KIB_TX_UNMAPPED;
70                 break;
71
72 #if IBNAL_FMR
73         case KIB_TX_MAPPED_FMR:
74                 if (in_interrupt() && tx->tx_status != 0) {
75                         /* can't flush FMRs in IRQ context... */
76                         kibnal_schedule_tx_done(tx);
77                         return;
78                 }              
79
80                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
81                 LASSERT (rc == 0);
82
83                 if (tx->tx_status != 0)
84                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
85                 tx->tx_mapped = KIB_TX_UNMAPPED;
86                 break;
87 #endif
88         }
89
90         for (i = 0; i < 2; i++) {
91                 /* tx may have up to 2 libmsgs to finalise */
92                 if (tx->tx_libmsg[i] == NULL)
93                         continue;
94
95                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96                 tx->tx_libmsg[i] = NULL;
97         }
98         
99         if (tx->tx_conn != NULL) {
100                 kibnal_put_conn (tx->tx_conn);
101                 tx->tx_conn = NULL;
102         }
103
104         tx->tx_nsp = 0;
105         tx->tx_passive_rdma = 0;
106         tx->tx_status = 0;
107
108         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
109
110         if (tx->tx_isnblk) {
111                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
112         } else {
113                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
114                 wake_up (&kibnal_data.kib_idle_tx_waitq);
115         }
116
117         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
118 }
119
120 kib_tx_t *
121 kibnal_get_idle_tx (int may_block) 
122 {
123         unsigned long  flags;
124         kib_tx_t      *tx = NULL;
125         
126         for (;;) {
127                 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
128
129                 /* "normal" descriptor is free */
130                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
131                         tx = list_entry (kibnal_data.kib_idle_txs.next,
132                                          kib_tx_t, tx_list);
133                         break;
134                 }
135
136                 if (!may_block) {
137                         /* may dip into reserve pool */
138                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
139                                 CERROR ("reserved tx desc pool exhausted\n");
140                                 break;
141                         }
142
143                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
144                                          kib_tx_t, tx_list);
145                         break;
146                 }
147
148                 /* block for idle tx */
149                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
150
151                 wait_event (kibnal_data.kib_idle_tx_waitq,
152                             !list_empty (&kibnal_data.kib_idle_txs) ||
153                             kibnal_data.kib_shutdown);
154         }
155
156         if (tx != NULL) {
157                 list_del (&tx->tx_list);
158
159                 /* Allocate a new passive RDMA completion cookie.  It might
160                  * not be needed, but we've got a lock right now and we're
161                  * unlikely to wrap... */
162                 tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
163
164                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
165                 LASSERT (tx->tx_nsp == 0);
166                 LASSERT (tx->tx_sending == 0);
167                 LASSERT (tx->tx_status == 0);
168                 LASSERT (tx->tx_conn == NULL);
169                 LASSERT (!tx->tx_passive_rdma);
170                 LASSERT (!tx->tx_passive_rdma_wait);
171                 LASSERT (tx->tx_libmsg[0] == NULL);
172                 LASSERT (tx->tx_libmsg[1] == NULL);
173         }
174
175         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
176         
177         return (tx);
178 }
179
180 int
181 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
182 {
183         /* I would guess that if kibnal_get_peer (nid) == NULL,
184            and we're not routing, then 'nid' is very distant :) */
185         if ( nal->libnal_ni.ni_pid.nid == nid ) {
186                 *dist = 0;
187         } else {
188                 *dist = 1;
189         }
190
191         return 0;
192 }
193
194 void
195 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
196 {
197         struct list_head *ttmp;
198         unsigned long     flags;
199         int               idle;
200
201         spin_lock_irqsave (&conn->ibc_lock, flags);
202
203         list_for_each (ttmp, &conn->ibc_active_txs) {
204                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
205
206                 LASSERT (tx->tx_passive_rdma ||
207                          !tx->tx_passive_rdma_wait);
208
209                 LASSERT (tx->tx_passive_rdma_wait ||
210                          tx->tx_sending != 0);
211
212                 if (!tx->tx_passive_rdma_wait ||
213                     tx->tx_passive_rdma_cookie != cookie)
214                         continue;
215
216                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
217
218                 tx->tx_status = status;
219                 tx->tx_passive_rdma_wait = 0;
220                 idle = (tx->tx_sending == 0);
221
222                 if (idle)
223                         list_del (&tx->tx_list);
224
225                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
226
227                 /* I could be racing with tx callbacks.  It's whoever
228                  * _makes_ tx idle that frees it */
229                 if (idle)
230                         kibnal_tx_done (tx);
231                 return;
232         }
233                 
234         spin_unlock_irqrestore (&conn->ibc_lock, flags);
235
236         CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
237                 cookie, conn->ibc_peer->ibp_nid);
238 }
239
240 void
241 kibnal_post_rx (kib_rx_t *rx, int do_credits)
242 {
243         kib_conn_t   *conn = rx->rx_conn;
244         int           rc;
245         unsigned long flags;
246
247         rx->rx_gl = (struct ib_gather_scatter) {
248                 .address = rx->rx_vaddr,
249                 .length  = IBNAL_MSG_SIZE,
250                 .key     = conn->ibc_rx_pages->ibp_lkey,
251         };
252
253         rx->rx_sp = (struct ib_receive_param) {
254                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
255                 .scatter_list           = &rx->rx_gl,
256                 .num_scatter_entries    = 1,
257                 .device_specific        = NULL,
258                 .signaled               = 1,
259         };
260
261         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
262         LASSERT (!rx->rx_posted);
263         rx->rx_posted = 1;
264         mb();
265
266         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
267                 rc = -ECONNABORTED;
268         else
269                 rc = ib_receive (conn->ibc_qp, &rx->rx_sp, 1);
270
271         if (rc == 0) {
272                 if (do_credits) {
273                         spin_lock_irqsave(&conn->ibc_lock, flags);
274                         conn->ibc_outstanding_credits++;
275                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
276
277                         kibnal_check_sends(conn);
278                 }
279                 return;
280         }
281
282         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
283                 CERROR ("Error posting receive -> "LPX64": %d\n",
284                         conn->ibc_peer->ibp_nid, rc);
285                 kibnal_close_conn (rx->rx_conn, rc);
286         } else {
287                 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
288                         conn->ibc_peer->ibp_nid, rc);
289         }
290
291         /* Drop rx's ref */
292         kibnal_put_conn (conn);
293 }
294
295 void
296 kibnal_rx_callback (struct ib_cq_entry *e)
297 {
298         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
299         kib_msg_t    *msg = rx->rx_msg;
300         kib_conn_t   *conn = rx->rx_conn;
301         int           credits;
302         unsigned long flags;
303         int           rc;
304
305         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
306         LASSERT (rx->rx_posted);
307         rx->rx_posted = 0;
308         mb();
309
310         /* receives complete with error in any case after we've started
311          * closing the QP */
312         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
313                 goto failed;
314
315         /* We don't post receives until the conn is established */
316         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
317
318         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
319                 CERROR("Rx from "LPX64" failed: %d\n", 
320                        conn->ibc_peer->ibp_nid, e->status);
321                 goto failed;
322         }
323
324         rc = kibnal_unpack_msg(msg, e->bytes_transferred);
325         if (rc != 0) {
326                 CERROR ("Error %d unpacking rx from "LPX64"\n",
327                         rc, conn->ibc_peer->ibp_nid);
328                 goto failed;
329         }
330
331         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
332             msg->ibm_srcstamp != conn->ibc_incarnation ||
333             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
334             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
335                 CERROR ("Stale rx from "LPX64"\n",
336                         conn->ibc_peer->ibp_nid);
337                 goto failed;
338         }
339
340         /* Have I received credits that will let me send? */
341         credits = msg->ibm_credits;
342         if (credits != 0) {
343                 spin_lock_irqsave(&conn->ibc_lock, flags);
344                 conn->ibc_credits += credits;
345                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
346                 
347                 kibnal_check_sends(conn);
348         }
349
350         switch (msg->ibm_type) {
351         case IBNAL_MSG_NOOP:
352                 kibnal_post_rx (rx, 1);
353                 return;
354
355         case IBNAL_MSG_IMMEDIATE:
356                 break;
357                 
358         case IBNAL_MSG_PUT_RDMA:
359         case IBNAL_MSG_GET_RDMA:
360                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
361                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
362                        msg->ibm_u.rdma.ibrm_desc.rd_key,
363                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
364                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
365                 break;
366                 
367         case IBNAL_MSG_PUT_DONE:
368         case IBNAL_MSG_GET_DONE:
369                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
370                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
371                        msg->ibm_u.completion.ibcm_status);
372
373                 kibnal_complete_passive_rdma (conn, 
374                                               msg->ibm_u.completion.ibcm_cookie,
375                                               msg->ibm_u.completion.ibcm_status);
376                 kibnal_post_rx (rx, 1);
377                 return;
378                         
379         default:
380                 CERROR ("Bad msg type %x from "LPX64"\n",
381                         msg->ibm_type, conn->ibc_peer->ibp_nid);
382                 goto failed;
383         }
384
385         /* schedule for kibnal_rx() in thread context */
386         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
387         
388         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
389         wake_up (&kibnal_data.kib_sched_waitq);
390         
391         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
392         return;
393         
394  failed:
395         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
396         kibnal_close_conn(conn, -ECONNABORTED);
397
398         /* Don't re-post rx & drop its ref on conn */
399         kibnal_put_conn(conn);
400 }
401
402 void
403 kibnal_rx (kib_rx_t *rx)
404 {
405         kib_msg_t   *msg = rx->rx_msg;
406
407         /* Clear flag so I can detect if I've sent an RDMA completion */
408         rx->rx_rdma = 0;
409
410         switch (msg->ibm_type) {
411         case IBNAL_MSG_GET_RDMA:
412                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
413                 /* If the incoming get was matched, I'll have initiated the
414                  * RDMA and the completion message... */
415                 if (rx->rx_rdma)
416                         break;
417
418                 /* Otherwise, I'll send a failed completion now to prevent
419                  * the peer's GET blocking for the full timeout. */
420                 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
421                         rx->rx_conn->ibc_peer->ibp_nid);
422                 kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
423                                           rx, NULL, 0, NULL, NULL, 0, 0);
424                 break;
425                 
426         case IBNAL_MSG_PUT_RDMA:
427                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
428                 if (rx->rx_rdma)
429                         break;
430                 /* This is most unusual, since even if lib_parse() didn't
431                  * match anything, it should have asked us to read (and
432                  * discard) the payload.  The portals header must be
433                  * inconsistent with this message type, so it's the
434                  * sender's fault for sending garbage and she can time
435                  * herself out... */
436                 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
437                         rx->rx_conn->ibc_peer->ibp_nid);
438                 break;
439
440         case IBNAL_MSG_IMMEDIATE:
441                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
442                 LASSERT (!rx->rx_rdma);
443                 break;
444                 
445         default:
446                 LBUG();
447                 break;
448         }
449
450         kibnal_post_rx (rx, 1);
451 }
452
453 #if 0
454 int
455 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
456 {
457         struct page *page;
458
459         if (vaddr >= VMALLOC_START &&
460             vaddr < VMALLOC_END)
461                 page = vmalloc_to_page ((void *)vaddr);
462 #if CONFIG_HIGHMEM
463         else if (vaddr >= PKMAP_BASE &&
464                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
465                 page = vmalloc_to_page ((void *)vaddr);
466         /* in 2.4 ^ just walks the page tables */
467 #endif
468         else
469                 page = virt_to_page (vaddr);
470
471         if (page == NULL ||
472             !VALID_PAGE (page))
473                 return (-EFAULT);
474
475         *physp = kibnal_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
476         return (0);
477 }
478 #endif
479
480 int
481 kibnal_map_iov (kib_tx_t *tx, enum ib_memory_access access,
482                  int niov, struct iovec *iov, int offset, int nob)
483                  
484 {
485         void   *vaddr;
486         int     rc;
487
488         LASSERT (nob > 0);
489         LASSERT (niov > 0);
490         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
491
492         while (offset >= iov->iov_len) {
493                 offset -= iov->iov_len;
494                 niov--;
495                 iov++;
496                 LASSERT (niov > 0);
497         }
498
499         if (nob > iov->iov_len - offset) {
500                 CERROR ("Can't map multiple vaddr fragments\n");
501                 return (-EMSGSIZE);
502         }
503
504         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
505         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
506
507         rc = ib_memory_register (kibnal_data.kib_pd,
508                                  vaddr, nob,
509                                  access,
510                                  &tx->tx_md.md_handle.mr,
511                                  &tx->tx_md.md_lkey,
512                                  &tx->tx_md.md_rkey);
513         
514         if (rc != 0) {
515                 CERROR ("Can't map vaddr: %d\n", rc);
516                 return (rc);
517         }
518
519         tx->tx_mapped = KIB_TX_MAPPED;
520         return (0);
521 }
522
523 int
524 kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access,
525                   int nkiov, ptl_kiov_t *kiov,
526                   int offset, int nob)
527 {
528 #if IBNAL_FMR
529         __u64                      *phys;
530         const int                   mapped = KIB_TX_MAPPED_FMR;
531 #else
532         struct ib_physical_buffer  *phys;
533         const int                   mapped = KIB_TX_MAPPED;
534 #endif
535         int                         page_offset;
536         int                         nphys;
537         int                         resid;
538         int                         phys_size;
539         int                         rc;
540
541         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
542
543         LASSERT (nob > 0);
544         LASSERT (nkiov > 0);
545         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
546
547         while (offset >= kiov->kiov_len) {
548                 offset -= kiov->kiov_len;
549                 nkiov--;
550                 kiov++;
551                 LASSERT (nkiov > 0);
552         }
553
554         phys_size = nkiov * sizeof (*phys);
555         PORTAL_ALLOC(phys, phys_size);
556         if (phys == NULL) {
557                 CERROR ("Can't allocate tmp phys\n");
558                 return (-ENOMEM);
559         }
560
561         page_offset = kiov->kiov_offset + offset;
562 #if IBNAL_FMR
563         phys[0] = kibnal_page2phys(kiov->kiov_page);
564 #else
565         phys[0].address = kibnal_page2phys(kiov->kiov_page);
566         phys[0].size = PAGE_SIZE;
567 #endif
568         nphys = 1;
569         resid = nob - (kiov->kiov_len - offset);
570
571         while (resid > 0) {
572                 kiov++;
573                 nkiov--;
574                 LASSERT (nkiov > 0);
575
576                 if (kiov->kiov_offset != 0 ||
577                     ((resid > PAGE_SIZE) && 
578                      kiov->kiov_len < PAGE_SIZE)) {
579                         int i;
580                         /* Can't have gaps */
581                         CERROR ("Can't make payload contiguous in I/O VM:"
582                                 "page %d, offset %d, len %d \n", nphys, 
583                                 kiov->kiov_offset, kiov->kiov_len);
584
585                         for (i = -nphys; i < nkiov; i++) 
586                         {
587                                 CERROR("kiov[%d] %p +%d for %d\n",
588                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
589                         }
590                         
591                         rc = -EINVAL;
592                         goto out;
593                 }
594
595                 if (nphys == PTL_MD_MAX_IOV) {
596                         CERROR ("payload too big (%d)\n", nphys);
597                         rc = -EMSGSIZE;
598                         goto out;
599                 }
600
601                 LASSERT (nphys * sizeof (*phys) < phys_size);
602 #if IBNAL_FMR
603                 phys[nphys] = kibnal_page2phys(kiov->kiov_page);
604 #else
605                 phys[nphys].address = kibnal_page2phys(kiov->kiov_page);
606                 phys[nphys].size = PAGE_SIZE;
607 #endif
608                 nphys++;
609
610                 resid -= PAGE_SIZE;
611         }
612
613 #if 0
614         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
615         for (rc = 0; rc < nphys; rc++)
616                 CWARN ("   [%d] "LPX64" / %d\n", rc, phys[rc].address, phys[rc].size);
617 #endif
618         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
619
620 #if IBNAL_FMR
621         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
622                                        phys, nphys,
623                                        &tx->tx_md.md_addr,
624                                        page_offset,
625                                        &tx->tx_md.md_handle.fmr,
626                                        &tx->tx_md.md_lkey,
627                                        &tx->tx_md.md_rkey);
628 #else
629         rc = ib_memory_register_physical (kibnal_data.kib_pd,
630                                           phys, nphys,
631                                           &tx->tx_md.md_addr,
632                                           nob, page_offset,
633                                           access,
634                                           &tx->tx_md.md_handle.mr,
635                                           &tx->tx_md.md_lkey,
636                                           &tx->tx_md.md_rkey);
637 #endif
638         if (rc == 0) {
639                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
640                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
641                 tx->tx_mapped = mapped;
642         } else {
643                 CERROR ("Can't map phys: %d\n", rc);
644                 rc = -EFAULT;
645         }
646
647  out:
648         PORTAL_FREE(phys, phys_size);
649         return (rc);
650 }
651
652 kib_conn_t *
653 kibnal_find_conn_locked (kib_peer_t *peer)
654 {
655         struct list_head *tmp;
656
657         /* just return the first connection */
658         list_for_each (tmp, &peer->ibp_conns) {
659                 return (list_entry(tmp, kib_conn_t, ibc_list));
660         }
661
662         return (NULL);
663 }
664
665 void
666 kibnal_check_sends (kib_conn_t *conn)
667 {
668         unsigned long   flags;
669         kib_tx_t       *tx;
670         int             rc;
671         int             i;
672         int             done;
673         int             nwork;
674
675         spin_lock_irqsave (&conn->ibc_lock, flags);
676
677         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
678
679         if (list_empty(&conn->ibc_tx_queue) &&
680             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
681                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
682                 
683                 tx = kibnal_get_idle_tx(0);     /* don't block */
684                 if (tx != NULL)
685                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
686
687                 spin_lock_irqsave(&conn->ibc_lock, flags);
688                 
689                 if (tx != NULL) {
690                         atomic_inc(&conn->ibc_refcount);
691                         kibnal_queue_tx_locked(tx, conn);
692                 }
693         }
694
695         while (!list_empty (&conn->ibc_tx_queue)) {
696                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
697
698                 /* We rely on this for QP sizing */
699                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
700
701                 LASSERT (conn->ibc_outstanding_credits >= 0);
702                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
703                 LASSERT (conn->ibc_credits >= 0);
704                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
705
706                 /* Not on ibc_rdma_queue */
707                 LASSERT (!tx->tx_passive_rdma_wait);
708
709                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
710                         break;
711
712                 if (conn->ibc_credits == 0)     /* no credits */
713                         break;
714                 
715                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
716                     conn->ibc_outstanding_credits == 0) /* giving back credits */
717                         break;
718
719                 list_del (&tx->tx_list);
720
721                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
722                     (!list_empty(&conn->ibc_tx_queue) ||
723                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
724                         /* redundant NOOP */
725                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
726                         kibnal_tx_done(tx);
727                         spin_lock_irqsave(&conn->ibc_lock, flags);
728                         continue;
729                 }
730
731                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
732                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
733
734                 conn->ibc_outstanding_credits = 0;
735                 conn->ibc_nsends_posted++;
736                 conn->ibc_credits--;
737
738                 tx->tx_sending = tx->tx_nsp;
739                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
740                 list_add (&tx->tx_list, &conn->ibc_active_txs);
741
742                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
743
744                 /* NB the gap between removing tx from the queue and sending it
745                  * allows message re-ordering to occur */
746
747                 LASSERT (tx->tx_nsp > 0);
748
749                 rc = -ECONNABORTED;
750                 nwork = 0;
751                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
752                         tx->tx_status = 0;
753                         /* Driver only accepts 1 item at a time */
754                         for (i = 0; i < tx->tx_nsp; i++) {
755                                 rc = ib_send (conn->ibc_qp, &tx->tx_sp[i], 1);
756                                 if (rc != 0)
757                                         break;
758                                 nwork++;
759                         }
760                 }
761
762                 spin_lock_irqsave (&conn->ibc_lock, flags);
763                 if (rc != 0) {
764                         /* NB credits are transferred in the actual
765                          * message, which can only be the last work item */
766                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
767                         conn->ibc_credits++;
768                         conn->ibc_nsends_posted--;
769
770                         tx->tx_status = rc;
771                         tx->tx_passive_rdma_wait = 0;
772                         tx->tx_sending -= tx->tx_nsp - nwork;
773
774                         done = (tx->tx_sending == 0);
775                         if (done)
776                                 list_del (&tx->tx_list);
777                         
778                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
779                         
780                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
781                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
782                                         rc, conn->ibc_peer->ibp_nid);
783                         else
784                                 CDEBUG (D_NET, "Error %d posting transmit to "
785                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
786
787                         kibnal_close_conn (conn, rc);
788
789                         if (done)
790                                 kibnal_tx_done (tx);
791                         return;
792                 }
793                 
794         }
795
796         spin_unlock_irqrestore (&conn->ibc_lock, flags);
797 }
798
799 void
800 kibnal_tx_callback (struct ib_cq_entry *e)
801 {
802         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
803         kib_conn_t   *conn;
804         unsigned long flags;
805         int           idle;
806
807         conn = tx->tx_conn;
808         LASSERT (conn != NULL);
809         LASSERT (tx->tx_sending != 0);
810
811         spin_lock_irqsave(&conn->ibc_lock, flags);
812
813         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
814                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
815                e->status);
816
817         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
818          * gets to free it, which also drops its ref on 'conn'.  If it's
819          * not me, then I take an extra ref on conn so it can't disappear
820          * under me. */
821
822         tx->tx_sending--;
823         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
824                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
825         if (idle)
826                 list_del(&tx->tx_list);
827
828         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
829                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
830                atomic_read (&conn->ibc_refcount));
831         atomic_inc (&conn->ibc_refcount);
832
833         if (tx->tx_sending == 0)
834                 conn->ibc_nsends_posted--;
835
836         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
837             tx->tx_status == 0)
838                 tx->tx_status = -ECONNABORTED;
839                 
840         spin_unlock_irqrestore(&conn->ibc_lock, flags);
841
842         if (idle)
843                 kibnal_tx_done (tx);
844
845         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
846                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
847                         conn->ibc_peer->ibp_nid, e->status);
848                 kibnal_close_conn (conn, -ENETDOWN);
849         } else {
850                 /* can I shovel some more sends out the door? */
851                 kibnal_check_sends(conn);
852         }
853
854         kibnal_put_conn (conn);
855 }
856
857 void
858 kibnal_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
859 {
860         if (kibnal_wreqid_is_rx(e->work_request_id))
861                 kibnal_rx_callback (e);
862         else
863                 kibnal_tx_callback (e);
864 }
865
866 void
867 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
868 {
869         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
870         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
871         int                       fence;
872         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
873
874         LASSERT (tx->tx_nsp >= 0 && 
875                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
876         LASSERT (nob <= IBNAL_MSG_SIZE);
877
878         kibnal_init_msg(tx->tx_msg, type, body_nob);
879
880         /* Fence the message if it's bundled with an RDMA read */
881         fence = (tx->tx_nsp > 0) &&
882                 (type == IBNAL_MSG_PUT_DONE);
883
884         *gl = (struct ib_gather_scatter) {
885                 .address = tx->tx_vaddr,
886                 .length  = nob,
887                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
888         };
889
890         /* NB If this is an RDMA read, the completion message must wait for
891          * the RDMA to complete.  Sends wait for previous RDMA writes
892          * anyway... */
893         *sp = (struct ib_send_param) {
894                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
895                 .op                   = IB_OP_SEND,
896                 .gather_list          = gl,
897                 .num_gather_entries   = 1,
898                 .device_specific      = NULL,
899                 .solicited_event      = 1,
900                 .signaled             = 1,
901                 .immediate_data_valid = 0,
902                 .fence                = fence,
903                 .inline_data          = 0,
904         };
905
906         tx->tx_nsp++;
907 }
908
909 void
910 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
911 {
912         unsigned long         flags;
913
914         spin_lock_irqsave(&conn->ibc_lock, flags);
915
916         kibnal_queue_tx_locked (tx, conn);
917         
918         spin_unlock_irqrestore(&conn->ibc_lock, flags);
919         
920         kibnal_check_sends(conn);
921 }
922
923 void
924 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
925 {
926         unsigned long    flags;
927         kib_peer_t      *peer;
928         kib_conn_t      *conn;
929         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
930
931         /* If I get here, I've committed to send, so I complete the tx with
932          * failure on any problems */
933         
934         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
935         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
936
937         read_lock (g_lock);
938         
939         peer = kibnal_find_peer_locked (nid);
940         if (peer == NULL) {
941                 read_unlock (g_lock);
942                 tx->tx_status = -EHOSTUNREACH;
943                 kibnal_tx_done (tx);
944                 return;
945         }
946
947         conn = kibnal_find_conn_locked (peer);
948         if (conn != NULL) {
949                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
950                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
951                        atomic_read (&conn->ibc_refcount));
952                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
953                 read_unlock (g_lock);
954                 
955                 kibnal_queue_tx (tx, conn);
956                 return;
957         }
958         
959         /* Making one or more connections; I'll need a write lock... */
960         read_unlock (g_lock);
961         write_lock_irqsave (g_lock, flags);
962
963         peer = kibnal_find_peer_locked (nid);
964         if (peer == NULL) {
965                 write_unlock_irqrestore (g_lock, flags);
966                 tx->tx_status = -EHOSTUNREACH;
967                 kibnal_tx_done (tx);
968                 return;
969         }
970
971         conn = kibnal_find_conn_locked (peer);
972         if (conn != NULL) {
973                 /* Connection exists; queue message on it */
974                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
975                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
976                        atomic_read (&conn->ibc_refcount));
977                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
978                 write_unlock_irqrestore (g_lock, flags);
979                 
980                 kibnal_queue_tx (tx, conn);
981                 return;
982         }
983
984         if (peer->ibp_connecting == 0) {
985                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
986                         write_unlock_irqrestore (g_lock, flags);
987                         tx->tx_status = -EHOSTUNREACH;
988                         kibnal_tx_done (tx);
989                         return;
990                 }
991         
992                 peer->ibp_connecting = 1;
993                 atomic_inc (&peer->ibp_refcount); /* extra ref for connd */
994         
995                 spin_lock (&kibnal_data.kib_connd_lock);
996         
997                 list_add_tail (&peer->ibp_connd_list,
998                                &kibnal_data.kib_connd_peers);
999                 wake_up (&kibnal_data.kib_connd_waitq);
1000         
1001                 spin_unlock (&kibnal_data.kib_connd_lock);
1002         }
1003         
1004         /* A connection is being established; queue the message... */
1005         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1006
1007         write_unlock_irqrestore (g_lock, flags);
1008 }
1009
1010 ptl_err_t
1011 kibnal_start_passive_rdma (int type, ptl_nid_t nid,
1012                             lib_msg_t *libmsg, ptl_hdr_t *hdr)
1013 {
1014         int         nob = libmsg->md->length;
1015         kib_tx_t   *tx;
1016         kib_msg_t  *ibmsg;
1017         int         rc;
1018         int         access;
1019         
1020         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1021                  type == IBNAL_MSG_GET_RDMA);
1022         LASSERT (nob > 0);
1023         LASSERT (!in_interrupt());              /* Mapping could block */
1024
1025         if (type == IBNAL_MSG_PUT_RDMA) {
1026                 access = IB_ACCESS_REMOTE_READ;
1027         } else {
1028                 access = IB_ACCESS_REMOTE_WRITE |
1029                          IB_ACCESS_LOCAL_WRITE;
1030         }
1031
1032         tx = kibnal_get_idle_tx (1);           /* May block; caller is an app thread */
1033         LASSERT (tx != NULL);
1034
1035         if ((libmsg->md->options & PTL_MD_KIOV) == 0) 
1036                 rc = kibnal_map_iov (tx, access,
1037                                      libmsg->md->md_niov,
1038                                      libmsg->md->md_iov.iov,
1039                                      0, nob);
1040         else
1041                 rc = kibnal_map_kiov (tx, access,
1042                                       libmsg->md->md_niov, 
1043                                       libmsg->md->md_iov.kiov,
1044                                       0, nob);
1045
1046         if (rc != 0) {
1047                 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1048                 goto failed;
1049         }
1050         
1051         if (type == IBNAL_MSG_GET_RDMA) {
1052                 /* reply gets finalized when tx completes */
1053                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, 
1054                                                         nid, libmsg);
1055                 if (tx->tx_libmsg[1] == NULL) {
1056                         CERROR ("Can't create reply for GET -> "LPX64"\n",
1057                                 nid);
1058                         rc = -ENOMEM;
1059                         goto failed;
1060                 }
1061         }
1062         
1063         tx->tx_passive_rdma = 1;
1064
1065         ibmsg = tx->tx_msg;
1066
1067         ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
1068         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1069         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1070         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1071         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1072
1073         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1074
1075         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1076                LPX64", nob %d\n",
1077                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1078                tx->tx_md.md_addr, nob);
1079         
1080         /* libmsg gets finalized when tx completes. */
1081         tx->tx_libmsg[0] = libmsg;
1082
1083         kibnal_launch_tx(tx, nid);
1084         return (PTL_OK);
1085
1086  failed:
1087         tx->tx_status = rc;
1088         kibnal_tx_done (tx);
1089         return (PTL_FAIL);
1090 }
1091
1092 void
1093 kibnal_start_active_rdma (int type, int status,
1094                            kib_rx_t *rx, lib_msg_t *libmsg, 
1095                            unsigned int niov,
1096                            struct iovec *iov, ptl_kiov_t *kiov,
1097                            int offset, int nob)
1098 {
1099         kib_msg_t    *rxmsg = rx->rx_msg;
1100         kib_msg_t    *txmsg;
1101         kib_tx_t     *tx;
1102         int           access;
1103         int           rdma_op;
1104         int           rc;
1105
1106         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1107                type, status, niov, offset, nob);
1108
1109         /* Called by scheduler */
1110         LASSERT (!in_interrupt ());
1111
1112         /* Either all pages or all vaddrs */
1113         LASSERT (!(kiov != NULL && iov != NULL));
1114
1115         /* No data if we're completing with failure */
1116         LASSERT (status == 0 || nob == 0);
1117
1118         LASSERT (type == IBNAL_MSG_GET_DONE ||
1119                  type == IBNAL_MSG_PUT_DONE);
1120
1121         /* Flag I'm completing the RDMA.  Even if I fail to send the
1122          * completion message, I will have tried my best so further
1123          * attempts shouldn't be tried. */
1124         LASSERT (!rx->rx_rdma);
1125         rx->rx_rdma = 1;
1126
1127         if (type == IBNAL_MSG_GET_DONE) {
1128                 access   = 0;
1129                 rdma_op  = IB_OP_RDMA_WRITE;
1130                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1131         } else {
1132                 access   = IB_ACCESS_LOCAL_WRITE;
1133                 rdma_op  = IB_OP_RDMA_READ;
1134                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1135         }
1136
1137         tx = kibnal_get_idle_tx (0);           /* Mustn't block */
1138         if (tx == NULL) {
1139                 CERROR ("tx descs exhausted on RDMA from "LPX64
1140                         " completing locally with failure\n",
1141                         rx->rx_conn->ibc_peer->ibp_nid);
1142                 lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1143                 return;
1144         }
1145         LASSERT (tx->tx_nsp == 0);
1146                         
1147         if (nob != 0) {
1148                 /* We actually need to transfer some data (the transfer
1149                  * size could get truncated to zero when the incoming
1150                  * message is matched) */
1151
1152                 if (kiov != NULL)
1153                         rc = kibnal_map_kiov (tx, access,
1154                                               niov, kiov, offset, nob);
1155                 else
1156                         rc = kibnal_map_iov (tx, access,
1157                                              niov, iov, offset, nob);
1158                 
1159                 if (rc != 0) {
1160                         CERROR ("Can't map RDMA -> "LPX64": %d\n", 
1161                                 rx->rx_conn->ibc_peer->ibp_nid, rc);
1162                         /* We'll skip the RDMA and complete with failure. */
1163                         status = rc;
1164                         nob = 0;
1165                 } else {
1166                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1167                                 .address = tx->tx_md.md_addr,
1168                                 .length  = nob,
1169                                 .key     = tx->tx_md.md_lkey,
1170                         };
1171                 
1172                         tx->tx_sp[0] = (struct ib_send_param) {
1173                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1174                                 .op                   = rdma_op,
1175                                 .gather_list          = &tx->tx_gl[0],
1176                                 .num_gather_entries   = 1,
1177                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1178                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1179                                 .device_specific      = NULL,
1180                                 .solicited_event      = 0,
1181                                 .signaled             = 1,
1182                                 .immediate_data_valid = 0,
1183                                 .fence                = 0,
1184                                 .inline_data          = 0,
1185                         };
1186
1187                         tx->tx_nsp = 1;
1188                 }
1189         }
1190
1191         txmsg = tx->tx_msg;
1192
1193         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1194         txmsg->ibm_u.completion.ibcm_status = status;
1195         
1196         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1197
1198         if (status == 0 && nob != 0) {
1199                 LASSERT (tx->tx_nsp > 1);
1200                 /* RDMA: libmsg gets finalized when the tx completes.  This
1201                  * is after the completion message has been sent, which in
1202                  * turn is after the RDMA has finished. */
1203                 tx->tx_libmsg[0] = libmsg;
1204         } else {
1205                 LASSERT (tx->tx_nsp == 1);
1206                 /* No RDMA: local completion happens now! */
1207                 CDEBUG(D_NET, "No data: immediate completion\n");
1208                 lib_finalize (&kibnal_lib, NULL, libmsg,
1209                               status == 0 ? PTL_OK : PTL_FAIL);
1210         }
1211
1212         /* +1 ref for this tx... */
1213         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1214                rx->rx_conn, rx->rx_conn->ibc_state, 
1215                rx->rx_conn->ibc_peer->ibp_nid,
1216                atomic_read (&rx->rx_conn->ibc_refcount));
1217         atomic_inc (&rx->rx_conn->ibc_refcount);
1218         /* ...and queue it up */
1219         kibnal_queue_tx(tx, rx->rx_conn);
1220 }
1221
1222 ptl_err_t
1223 kibnal_sendmsg(lib_nal_t    *nal, 
1224                 void         *private,
1225                 lib_msg_t    *libmsg,
1226                 ptl_hdr_t    *hdr, 
1227                 int           type, 
1228                 ptl_nid_t     nid, 
1229                 ptl_pid_t     pid,
1230                 unsigned int  payload_niov, 
1231                 struct iovec *payload_iov, 
1232                 ptl_kiov_t   *payload_kiov,
1233                 int           payload_offset,
1234                 int           payload_nob)
1235 {
1236         kib_msg_t  *ibmsg;
1237         kib_tx_t   *tx;
1238         int         nob;
1239
1240         /* NB 'private' is different depending on what we're sending.... */
1241
1242         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
1243                payload_nob, payload_niov, nid , pid);
1244
1245         LASSERT (payload_nob == 0 || payload_niov > 0);
1246         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1247
1248         /* Thread context if we're sending payload */
1249         LASSERT (!in_interrupt() || payload_niov == 0);
1250         /* payload is either all vaddrs or all pages */
1251         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1252
1253         switch (type) {
1254         default:
1255                 LBUG();
1256                 return (PTL_FAIL);
1257                 
1258         case PTL_MSG_REPLY: {
1259                 /* reply's 'private' is the incoming receive */
1260                 kib_rx_t *rx = private;
1261
1262                 /* RDMA reply expected? */
1263                 if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
1264                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1265                                                  rx, libmsg, payload_niov, 
1266                                                  payload_iov, payload_kiov,
1267                                                  payload_offset, payload_nob);
1268                         return (PTL_OK);
1269                 }
1270                 
1271                 /* Incoming message consistent with immediate reply? */
1272                 if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
1273                         CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1274                                 nid, rx->rx_msg->ibm_type);
1275                         return (PTL_FAIL);
1276                 }
1277
1278                 /* Will it fit in a message? */
1279                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1280                 if (nob >= IBNAL_MSG_SIZE) {
1281                         CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n", 
1282                                nid, payload_nob);
1283                         return (PTL_FAIL);
1284                 }
1285                 break;
1286         }
1287
1288         case PTL_MSG_GET:
1289                 /* might the REPLY message be big enough to need RDMA? */
1290                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1291                 if (nob > IBNAL_MSG_SIZE)
1292                         return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, 
1293                                                           nid, libmsg, hdr));
1294                 break;
1295
1296         case PTL_MSG_ACK:
1297                 LASSERT (payload_nob == 0);
1298                 break;
1299
1300         case PTL_MSG_PUT:
1301                 /* Is the payload big enough to need RDMA? */
1302                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1303                 if (nob > IBNAL_MSG_SIZE)
1304                         return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
1305                                                           nid, libmsg, hdr));
1306                 
1307                 break;
1308         }
1309
1310         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1311                                   type == PTL_MSG_REPLY ||
1312                                   in_interrupt()));
1313         if (tx == NULL) {
1314                 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", 
1315                         type, nid, in_interrupt() ? " (intr)" : "");
1316                 return (PTL_NO_SPACE);
1317         }
1318
1319         ibmsg = tx->tx_msg;
1320         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1321
1322         if (payload_nob > 0) {
1323                 if (payload_kiov != NULL)
1324                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1325                                           payload_niov, payload_kiov,
1326                                           payload_offset, payload_nob);
1327                 else
1328                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1329                                          payload_niov, payload_iov,
1330                                          payload_offset, payload_nob);
1331         }
1332
1333         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1334                             offsetof(kib_immediate_msg_t, 
1335                                      ibim_payload[payload_nob]));
1336
1337         /* libmsg gets finalized when tx completes */
1338         tx->tx_libmsg[0] = libmsg;
1339
1340         kibnal_launch_tx(tx, nid);
1341         return (PTL_OK);
1342 }
1343
1344 ptl_err_t
1345 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1346                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1347                unsigned int payload_niov, struct iovec *payload_iov,
1348                size_t payload_offset, size_t payload_len)
1349 {
1350         return (kibnal_sendmsg(nal, private, cookie,
1351                                hdr, type, nid, pid,
1352                                payload_niov, payload_iov, NULL,
1353                                payload_offset, payload_len));
1354 }
1355
1356 ptl_err_t
1357 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1358                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1359                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1360                      size_t payload_offset, size_t payload_len)
1361 {
1362         return (kibnal_sendmsg(nal, private, cookie,
1363                                hdr, type, nid, pid,
1364                                payload_niov, NULL, payload_kiov,
1365                                payload_offset, payload_len));
1366 }
1367
1368 ptl_err_t
1369 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1370                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1371                  int offset, int mlen, int rlen)
1372 {
1373         kib_rx_t    *rx = private;
1374         kib_msg_t   *rxmsg = rx->rx_msg;
1375         int          msg_nob;
1376         
1377         LASSERT (mlen <= rlen);
1378         LASSERT (!in_interrupt ());
1379         /* Either all pages or all vaddrs */
1380         LASSERT (!(kiov != NULL && iov != NULL));
1381
1382         switch (rxmsg->ibm_type) {
1383         default:
1384                 LBUG();
1385                 return (PTL_FAIL);
1386                 
1387         case IBNAL_MSG_IMMEDIATE:
1388                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1389                 if (msg_nob > IBNAL_MSG_SIZE) {
1390                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1391                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1392                         return (PTL_FAIL);
1393                 }
1394
1395                 if (kiov != NULL)
1396                         lib_copy_buf2kiov(niov, kiov, offset,
1397                                           rxmsg->ibm_u.immediate.ibim_payload,
1398                                           mlen);
1399                 else
1400                         lib_copy_buf2iov(niov, iov, offset,
1401                                          rxmsg->ibm_u.immediate.ibim_payload,
1402                                          mlen);
1403
1404                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1405                 return (PTL_OK);
1406
1407         case IBNAL_MSG_GET_RDMA:
1408                 /* We get called here just to discard any junk after the
1409                  * GET hdr. */
1410                 LASSERT (libmsg == NULL);
1411                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1412                 return (PTL_OK);
1413
1414         case IBNAL_MSG_PUT_RDMA:
1415                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
1416                                           rx, libmsg, 
1417                                           niov, iov, kiov, offset, mlen);
1418                 return (PTL_OK);
1419         }
1420 }
1421
1422 ptl_err_t
1423 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1424               unsigned int niov, struct iovec *iov, 
1425               size_t offset, size_t mlen, size_t rlen)
1426 {
1427         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1428                                 offset, mlen, rlen));
1429 }
1430
1431 ptl_err_t
1432 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1433                      unsigned int niov, ptl_kiov_t *kiov, 
1434                      size_t offset, size_t mlen, size_t rlen)
1435 {
1436         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1437                                 offset, mlen, rlen));
1438 }
1439
1440 int
1441 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1442 {
1443         long    pid = kernel_thread (fn, arg, 0);
1444
1445         if (pid < 0)
1446                 return ((int)pid);
1447
1448         atomic_inc (&kibnal_data.kib_nthreads);
1449         return (0);
1450 }
1451
1452 void
1453 kibnal_thread_fini (void)
1454 {
1455         atomic_dec (&kibnal_data.kib_nthreads);
1456 }
1457
1458 void
1459 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1460 {
1461         /* This just does the immmediate housekeeping, and schedules the
1462          * connection for the reaper to finish off.
1463          * Caller holds kib_global_lock exclusively in irq context */
1464         kib_peer_t   *peer = conn->ibc_peer;
1465
1466         CDEBUG (error == 0 ? D_NET : D_ERROR,
1467                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1468         
1469         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1470                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1471
1472         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1473                 /* kib_reaper_conns takes ibc_list's ref */
1474                 list_del (&conn->ibc_list);
1475         } else {
1476                 /* new ref for kib_reaper_conns */
1477                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1478                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1479                        atomic_read (&conn->ibc_refcount));
1480                 atomic_inc (&conn->ibc_refcount);
1481         }
1482         
1483         if (list_empty (&peer->ibp_conns) &&
1484             peer->ibp_persistence == 0) {
1485                 /* Non-persistent peer with no more conns... */
1486                 kibnal_unlink_peer_locked (peer);
1487         }
1488
1489         conn->ibc_state = IBNAL_CONN_DEATHROW;
1490
1491         /* Schedule conn for closing/destruction */
1492         spin_lock (&kibnal_data.kib_reaper_lock);
1493
1494         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1495         wake_up (&kibnal_data.kib_reaper_waitq);
1496                 
1497         spin_unlock (&kibnal_data.kib_reaper_lock);
1498 }
1499
1500 int
1501 kibnal_close_conn (kib_conn_t *conn, int why)
1502 {
1503         unsigned long     flags;
1504         int               count = 0;
1505
1506         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1507
1508         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1509         
1510         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1511                 count = 1;
1512                 kibnal_close_conn_locked (conn, why);
1513         }
1514         
1515         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1516         return (count);
1517 }
1518
1519 void
1520 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
1521 {
1522         LIST_HEAD        (zombies);
1523         kib_tx_t         *tx;
1524         unsigned long     flags;
1525
1526         LASSERT (rc != 0);
1527         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1528
1529         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1530
1531         LASSERT (peer->ibp_connecting != 0);
1532         peer->ibp_connecting--;
1533
1534         if (peer->ibp_connecting != 0) {
1535                 /* another connection attempt under way (loopback?)... */
1536                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1537                 return;
1538         }
1539
1540         if (list_empty(&peer->ibp_conns)) {
1541                 /* Say when active connection can be re-attempted */
1542                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1543                 /* Increase reconnection interval */
1544                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1545                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1546         
1547                 /* Take peer's blocked blocked transmits; I'll complete
1548                  * them with error */
1549                 while (!list_empty (&peer->ibp_tx_queue)) {
1550                         tx = list_entry (peer->ibp_tx_queue.next,
1551                                          kib_tx_t, tx_list);
1552                         
1553                         list_del (&tx->tx_list);
1554                         list_add_tail (&tx->tx_list, &zombies);
1555                 }
1556                 
1557                 if (kibnal_peer_active(peer) &&
1558                     (peer->ibp_persistence == 0)) {
1559                         /* failed connection attempt on non-persistent peer */
1560                         kibnal_unlink_peer_locked (peer);
1561                 }
1562         } else {
1563                 /* Can't have blocked transmits if there are connections */
1564                 LASSERT (list_empty(&peer->ibp_tx_queue));
1565         }
1566         
1567         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1568
1569         if (!list_empty (&zombies))
1570                 CERROR ("Deleting messages for "LPX64": connection failed\n",
1571                         peer->ibp_nid);
1572
1573         while (!list_empty (&zombies)) {
1574                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1575
1576                 list_del (&tx->tx_list);
1577                 /* complete now */
1578                 tx->tx_status = -EHOSTUNREACH;
1579                 kibnal_tx_done (tx);
1580         }
1581 }
1582
1583 void
1584 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1585 {
1586         int               state = conn->ibc_state;
1587         kib_peer_t       *peer = conn->ibc_peer;
1588         kib_tx_t         *tx;
1589         unsigned long     flags;
1590         int               rc;
1591         int               i;
1592
1593         /* passive connection has no connreq & vice versa */
1594         LASSERT (!active == !(conn->ibc_connreq != NULL));
1595         if (active) {
1596                 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1597                 conn->ibc_connreq = NULL;
1598         }
1599
1600         if (state == IBNAL_CONN_CONNECTING) {
1601                 /* Install common (active/passive) callback for
1602                  * disconnect/idle notification if I got as far as getting
1603                  * a CM comm_id */
1604                 rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1605                                           kibnal_conn_callback, conn);
1606                 LASSERT (rc == 0);
1607         }
1608         
1609         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1610
1611         LASSERT (peer->ibp_connecting != 0);
1612         
1613         if (status == 0) {                         
1614                 /* connection established... */
1615                 LASSERT (state == IBNAL_CONN_CONNECTING);
1616                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1617
1618                 if (!kibnal_peer_active(peer)) {
1619                         /* ...but peer deleted meantime */
1620                         status = -ECONNABORTED;
1621                 }
1622         } else {
1623                 LASSERT (state == IBNAL_CONN_INIT_QP ||
1624                          state == IBNAL_CONN_CONNECTING);
1625         }
1626
1627         if (status == 0) {
1628                 /* Everything worked! */
1629
1630 #warning "purge old conn incarnations"
1631
1632                 peer->ibp_connecting--;
1633
1634                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1635                  * the IB_CM_IDLE callback */
1636                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1637                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1638                        atomic_read (&conn->ibc_refcount));
1639                 atomic_inc (&conn->ibc_refcount);
1640                 list_add (&conn->ibc_list, &peer->ibp_conns);
1641                 
1642                 /* reset reconnect interval for next attempt */
1643                 peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1644
1645                 /* post blocked sends to the new connection */
1646                 spin_lock (&conn->ibc_lock);
1647                 
1648                 while (!list_empty (&peer->ibp_tx_queue)) {
1649                         tx = list_entry (peer->ibp_tx_queue.next, 
1650                                          kib_tx_t, tx_list);
1651                         
1652                         list_del (&tx->tx_list);
1653
1654                         /* +1 ref for each tx */
1655                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1656                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1657                                atomic_read (&conn->ibc_refcount));
1658                         atomic_inc (&conn->ibc_refcount);
1659                         kibnal_queue_tx_locked (tx, conn);
1660                 }
1661                 
1662                 spin_unlock (&conn->ibc_lock);
1663
1664                 /* Nuke any dangling conns from a different peer instance... */
1665                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1666                                                  conn->ibc_incarnation);
1667
1668                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1669
1670                 /* queue up all the receives */
1671                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1672                         /* +1 ref for rx desc */
1673                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1674                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1675                                atomic_read (&conn->ibc_refcount));
1676                         atomic_inc (&conn->ibc_refcount);
1677
1678                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1679                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1680                                conn->ibc_rxs[i].rx_vaddr);
1681
1682                         kibnal_post_rx (&conn->ibc_rxs[i], 0);
1683                 }
1684
1685                 kibnal_check_sends (conn);
1686                 return;
1687         }
1688
1689         /* connection failed */
1690         if (state == IBNAL_CONN_CONNECTING) {
1691                 /* schedule for reaper to close */
1692                 kibnal_close_conn_locked (conn, status);
1693         } else {
1694                 /* Don't have a CM comm_id; just wait for refs to drain */
1695                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1696         } 
1697
1698         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1699
1700         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1701
1702         if (state != IBNAL_CONN_CONNECTING) {
1703                 /* drop caller's ref if we're not waiting for the
1704                  * IB_CM_IDLE callback */
1705                 kibnal_put_conn (conn);
1706         }
1707 }
1708
1709 int
1710 kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1711                kib_msg_t *msg, int nob)
1712 {
1713         kib_conn_t    *conn;
1714         kib_peer_t    *peer;
1715         kib_peer_t    *peer2;
1716         unsigned long  flags;
1717         int            rc;
1718
1719         rc = kibnal_unpack_msg(msg, nob);
1720         if (rc != 0) {
1721                 CERROR("Can't unpack connreq msg: %d\n", rc);
1722                 return -EPROTO;
1723         }
1724
1725         CDEBUG(D_NET, "connreq from "LPX64"\n", msg->ibm_srcnid);
1726
1727         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1728                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
1729                        msg->ibm_type, msg->ibm_srcnid);
1730                 return -EPROTO;
1731         }
1732                 
1733         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1734                 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
1735                        msg->ibm_srcnid, msg->ibm_u.connparams.ibcp_queue_depth, 
1736                        IBNAL_MSG_QUEUE_SIZE);
1737                 return (-EPROTO);
1738         }
1739         
1740         conn = kibnal_create_conn();
1741         if (conn == NULL)
1742                 return (-ENOMEM);
1743
1744         /* assume 'nid' is a new peer */
1745         peer = kibnal_create_peer (msg->ibm_srcnid);
1746         if (peer == NULL) {
1747                 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
1748                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1749                        atomic_read (&conn->ibc_refcount));
1750                 atomic_dec (&conn->ibc_refcount);
1751                 kibnal_destroy_conn(conn);
1752                 return (-ENOMEM);
1753         }
1754         
1755         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1756
1757         /* Check I'm the same instance that gave the connection parameters.  
1758          * NB If my incarnation changes after this, the peer will get nuked and
1759          * we'll spot that when the connection is finally added into the peer's
1760          * connlist */
1761         if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
1762             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1763                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1764                 
1765                 CERROR("Stale connection params from "LPX64"\n",
1766                        msg->ibm_srcnid);
1767                 atomic_dec(&conn->ibc_refcount);
1768                 kibnal_destroy_conn(conn);
1769                 kibnal_put_peer(peer);
1770                 return -ESTALE;
1771         }
1772
1773         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1774         if (peer2 == NULL) {
1775                 /* peer table takes my ref on peer */
1776                 list_add_tail (&peer->ibp_list,
1777                                kibnal_nid2peerlist(msg->ibm_srcnid));
1778         } else {
1779                 kibnal_put_peer (peer);
1780                 peer = peer2;
1781         }
1782
1783         /* +1 ref for conn */
1784         atomic_inc (&peer->ibp_refcount);
1785         peer->ibp_connecting++;
1786
1787         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1788
1789         conn->ibc_peer = peer;
1790         conn->ibc_state = IBNAL_CONN_CONNECTING;
1791         conn->ibc_comm_id = cid;
1792         conn->ibc_incarnation = msg->ibm_srcstamp;
1793         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1794
1795         *connp = conn;
1796         return (0);
1797 }
1798
1799 tTS_IB_CM_CALLBACK_RETURN
1800 kibnal_idle_conn_callback (tTS_IB_CM_EVENT event,
1801                             tTS_IB_CM_COMM_ID cid,
1802                             void *param,
1803                             void *arg)
1804 {
1805         /* Shouldn't ever get a callback after TS_IB_CM_IDLE */
1806         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1807         LBUG ();
1808         return TS_IB_CM_CALLBACK_PROCEED;
1809 }
1810
1811 tTS_IB_CM_CALLBACK_RETURN
1812 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1813                        tTS_IB_CM_COMM_ID cid,
1814                        void *param,
1815                        void *arg)
1816 {
1817         kib_conn_t       *conn = arg;
1818         LIST_HEAD        (zombies); 
1819         struct list_head *tmp;
1820         struct list_head *nxt;
1821         kib_tx_t         *tx;
1822         unsigned long     flags;
1823         int               done;
1824         int               rc;
1825
1826         /* Established Connection Notifier */
1827
1828         switch (event) {
1829         default:
1830                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
1831                        conn, conn->ibc_peer->ibp_nid, event);
1832                 kibnal_close_conn (conn, -ECONNABORTED);
1833                 break;
1834                 
1835         case TS_IB_CM_DISCONNECTED:
1836                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" DISCONNECTED.\n",
1837                        conn, conn->ibc_peer->ibp_nid);
1838                 kibnal_close_conn (conn, 0);
1839                 break;
1840
1841         case TS_IB_CM_IDLE:
1842                 CDEBUG(D_NET, "Connection %p -> "LPX64" IDLE.\n",
1843                        conn, conn->ibc_peer->ibp_nid);
1844                 kibnal_put_conn (conn);        /* Lose CM's ref */
1845
1846                 /* LASSERT (no further callbacks) */
1847                 rc = tsIbCmCallbackModify(cid, 
1848                                           kibnal_idle_conn_callback, conn);
1849                 LASSERT (rc == 0);
1850
1851                 /* NB we wait until the connection has closed before
1852                  * completing outstanding passive RDMAs so we can be sure
1853                  * the network can't touch the mapped memory any more. */
1854
1855                 spin_lock_irqsave (&conn->ibc_lock, flags);
1856
1857                 /* grab passive RDMAs not waiting for the tx callback */
1858                 list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1859                         tx = list_entry (tmp, kib_tx_t, tx_list);
1860
1861                         LASSERT (tx->tx_passive_rdma ||
1862                                  !tx->tx_passive_rdma_wait);
1863
1864                         LASSERT (tx->tx_passive_rdma_wait ||
1865                                  tx->tx_sending != 0);
1866
1867                         /* still waiting for tx callback? */
1868                         if (!tx->tx_passive_rdma_wait)
1869                                 continue;
1870
1871                         tx->tx_status = -ECONNABORTED;
1872                         tx->tx_passive_rdma_wait = 0;
1873                         done = (tx->tx_sending == 0);
1874
1875                         if (!done)
1876                                 continue;
1877
1878                         list_del (&tx->tx_list);
1879                         list_add (&tx->tx_list, &zombies);
1880                 }
1881
1882                 /* grab all blocked transmits */
1883                 list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1884                         tx = list_entry (tmp, kib_tx_t, tx_list);
1885                         
1886                         list_del (&tx->tx_list);
1887                         list_add (&tx->tx_list, &zombies);
1888                 }
1889                 
1890                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
1891
1892                 while (!list_empty(&zombies)) {
1893                         tx = list_entry (zombies.next, kib_tx_t, tx_list);
1894
1895                         list_del(&tx->tx_list);
1896                         kibnal_tx_done (tx);
1897                 }
1898                 break;
1899         }
1900
1901         return TS_IB_CM_CALLBACK_PROCEED;
1902 }
1903
1904 tTS_IB_CM_CALLBACK_RETURN
1905 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1906                                tTS_IB_CM_COMM_ID cid,
1907                                void *param,
1908                                void *arg)
1909 {
1910         kib_conn_t  *conn = arg;
1911         int          rc;
1912         
1913         switch (event) {
1914         default:
1915                 if (conn == NULL) {
1916                         /* no connection yet */
1917                         CERROR ("Unexpected event: %d\n", event);
1918                         return TS_IB_CM_CALLBACK_ABORT;
1919                 }
1920                 
1921                 CERROR ("Unexpected event %p -> "LPX64": %d\n", 
1922                         conn, conn->ibc_peer->ibp_nid, event);
1923                 kibnal_connreq_done (conn, 0, -ECONNABORTED);
1924                 break;
1925                 
1926         case TS_IB_CM_REQ_RECEIVED: {
1927                 struct ib_cm_req_received_param *req = param;
1928                 kib_msg_t                       *msg = req->remote_private_data;
1929
1930                 LASSERT (conn == NULL);
1931
1932                 /* Don't really know srcnid until successful unpack */
1933                 CDEBUG(D_NET, "REQ from ?"LPX64"?\n", msg->ibm_srcnid);
1934
1935                 rc = kibnal_accept(&conn, cid, msg, 
1936                                    req->remote_private_data_len);
1937                 if (rc != 0) {
1938                         CERROR ("Can't accept ?"LPX64"?: %d\n",
1939                                 msg->ibm_srcnid, rc);
1940                         return TS_IB_CM_CALLBACK_ABORT;
1941                 }
1942
1943                 /* update 'arg' for next callback */
1944                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
1945                 LASSERT (rc == 0);
1946
1947                 msg = req->accept_param.reply_private_data;
1948                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
1949                                 sizeof(msg->ibm_u.connparams));
1950
1951                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
1952
1953                 kibnal_pack_msg(msg, 0, 
1954                                 conn->ibc_peer->ibp_nid, 
1955                                 conn->ibc_incarnation);
1956
1957                 req->accept_param.qp                     = conn->ibc_qp;
1958                 req->accept_param.reply_private_data_len = msg->ibm_nob;
1959                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
1960                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
1961                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
1962                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
1963
1964                 CDEBUG(D_NET, "Proceeding\n");
1965                 break;
1966         }
1967
1968         case TS_IB_CM_ESTABLISHED:
1969                 LASSERT (conn != NULL);
1970                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
1971                        conn, conn->ibc_peer->ibp_nid);
1972
1973                 kibnal_connreq_done (conn, 0, 0);
1974                 break;
1975         }
1976
1977         /* NB if the connreq is done, we switch to kibnal_conn_callback */
1978         return TS_IB_CM_CALLBACK_PROCEED;
1979 }
1980
1981 tTS_IB_CM_CALLBACK_RETURN
1982 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
1983                               tTS_IB_CM_COMM_ID cid,
1984                               void *param,
1985                               void *arg)
1986 {
1987         kib_conn_t *conn = arg;
1988
1989         switch (event) {
1990         case TS_IB_CM_REP_RECEIVED: {
1991                 struct ib_cm_rep_received_param *rep = param;
1992                 kib_msg_t                       *msg = rep->remote_private_data;
1993                 int                              nob = rep->remote_private_data_len;
1994                 int                              rc;
1995
1996                 rc = kibnal_unpack_msg(msg, nob);
1997                 if (rc != 0) {
1998                         CERROR ("Error %d unpacking conn ack from "LPX64"\n",
1999                                 rc, conn->ibc_peer->ibp_nid);
2000                         kibnal_connreq_done (conn, 1, rc);
2001                         break;
2002                 }
2003
2004                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2005                         CERROR ("Unexpected conn ack type %d from "LPX64"\n",
2006                                 msg->ibm_type, conn->ibc_peer->ibp_nid);
2007                         kibnal_connreq_done (conn, 1, -EPROTO);
2008                         break;
2009                 }
2010
2011                 if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
2012                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2013                     msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2014                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2015                         CERROR("Stale conn ack from "LPX64"\n",
2016                                conn->ibc_peer->ibp_nid);
2017                         kibnal_connreq_done (conn, 1, -ESTALE);
2018                         break;
2019                 }
2020
2021                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2022                         CERROR ("Bad queue depth %d from "LPX64"\n",
2023                                 msg->ibm_u.connparams.ibcp_queue_depth,
2024                                 conn->ibc_peer->ibp_nid);
2025                         kibnal_connreq_done (conn, 1, -EPROTO);
2026                         break;
2027                 }
2028                                 
2029                 CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2030                        conn, conn->ibc_peer->ibp_nid);
2031
2032                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2033                 break;
2034         }
2035
2036         case TS_IB_CM_ESTABLISHED:
2037                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" Established\n",
2038                        conn, conn->ibc_peer->ibp_nid);
2039
2040                 kibnal_connreq_done (conn, 1, 0);
2041                 break;
2042
2043         case TS_IB_CM_IDLE:
2044                 CERROR("Connection %p -> "LPX64" IDLE\n",
2045                        conn, conn->ibc_peer->ibp_nid);
2046                 /* Back out state change: I'm disengaged from CM */
2047                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2048                 
2049                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2050                 break;
2051
2052         default:
2053                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
2054                        conn, conn->ibc_peer->ibp_nid, event);
2055                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2056                 break;
2057         }
2058
2059         /* NB if the connreq is done, we switch to kibnal_conn_callback */
2060         return TS_IB_CM_CALLBACK_PROCEED;
2061 }
2062
2063 int
2064 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2065                           struct ib_path_record *resp, int remaining,
2066                           void *arg)
2067 {
2068         kib_conn_t *conn = arg;
2069         kib_peer_t *peer = conn->ibc_peer;
2070         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2071
2072         if (status != 0) {
2073                 CERROR ("status %d\n", status);
2074                 kibnal_connreq_done (conn, 1, status);
2075                 goto out;
2076         }
2077
2078         conn->ibc_connreq->cr_path = *resp;
2079
2080         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2081         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2082         kibnal_pack_msg(msg, 0, peer->ibp_nid, conn->ibc_incarnation);
2083
2084         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2085                 .qp                   = conn->ibc_qp,
2086                 .req_private_data     = msg,
2087                 .req_private_data_len = msg->ibm_nob,
2088                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2089                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2090                 .retry_count          = IBNAL_RETRY,
2091                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2092                 .cm_response_timeout  = kibnal_tunables.kib_io_timeout,
2093                 .max_cm_retries       = IBNAL_CM_RETRY,
2094                 .flow_control         = IBNAL_FLOW_CONTROL,
2095         };
2096
2097         /* XXX set timeout just like SDP!!!*/
2098         conn->ibc_connreq->cr_path.packet_life = 13;
2099         
2100         /* Flag I'm getting involved with the CM... */
2101         conn->ibc_state = IBNAL_CONN_CONNECTING;
2102
2103         CDEBUG(D_WARNING, "Connecting to, service id "LPX64", on "LPX64"\n",
2104                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, peer->ibp_nid);
2105
2106         /* kibnal_connect_callback gets my conn ref */
2107         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2108                                 &conn->ibc_connreq->cr_path, NULL,
2109                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2110                                 kibnal_active_conn_callback, conn,
2111                                 &conn->ibc_comm_id);
2112         if (status != 0) {
2113                 CERROR ("Connect: %d\n", status);
2114                 /* Back out state change: I've not got a CM comm_id yet... */
2115                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2116                 kibnal_connreq_done (conn, 1, status);
2117         }
2118         
2119  out:
2120         /* return non-zero to prevent further callbacks */
2121         return 1;
2122 }
2123
2124 void
2125 kibnal_connect_peer (kib_peer_t *peer)
2126 {
2127         kib_conn_t  *conn;
2128         int          rc;
2129
2130         conn = kibnal_create_conn();
2131         if (conn == NULL) {
2132                 CERROR ("Can't allocate conn\n");
2133                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2134                 return;
2135         }
2136
2137         conn->ibc_peer = peer;
2138         atomic_inc (&peer->ibp_refcount);
2139
2140         PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2141         if (conn->ibc_connreq == NULL) {
2142                 CERROR ("Can't allocate connreq\n");
2143                 kibnal_connreq_done (conn, 1, -ENOMEM);
2144                 return;
2145         }
2146
2147         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2148
2149         rc = kibnal_make_svcqry(conn);
2150         if (rc != 0) {
2151                 kibnal_connreq_done (conn, 1, rc);
2152                 return;
2153         }
2154
2155         rc = ib_cached_gid_get(kibnal_data.kib_device,
2156                                kibnal_data.kib_port, 0,
2157                                conn->ibc_connreq->cr_gid);
2158         LASSERT (rc == 0);
2159
2160         /* kibnal_pathreq_callback gets my conn ref */
2161         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2162                                     kibnal_data.kib_port,
2163                                     conn->ibc_connreq->cr_gid,
2164                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2165                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2166                                     0,
2167                                     kibnal_tunables.kib_io_timeout * HZ,
2168                                     0,
2169                                     kibnal_pathreq_callback, conn, 
2170                                     &conn->ibc_connreq->cr_tid);
2171         if (rc == 0)
2172                 return;
2173
2174         CERROR ("Path record request: %d\n", rc);
2175         kibnal_connreq_done (conn, 1, rc);
2176 }
2177
2178 int
2179 kibnal_conn_timed_out (kib_conn_t *conn)
2180 {
2181         kib_tx_t          *tx;
2182         struct list_head  *ttmp;
2183         unsigned long      flags;
2184
2185         spin_lock_irqsave (&conn->ibc_lock, flags);
2186
2187         list_for_each (ttmp, &conn->ibc_tx_queue) {
2188                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2189
2190                 LASSERT (!tx->tx_passive_rdma_wait);
2191                 LASSERT (tx->tx_sending == 0);
2192
2193                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2194                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2195                         return 1;
2196                 }
2197         }
2198
2199         list_for_each (ttmp, &conn->ibc_active_txs) {
2200                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2201
2202                 LASSERT (tx->tx_passive_rdma ||
2203                          !tx->tx_passive_rdma_wait);
2204
2205                 LASSERT (tx->tx_passive_rdma_wait ||
2206                          tx->tx_sending != 0);
2207
2208                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2209                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2210                         return 1;
2211                 }
2212         }
2213
2214         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2215
2216         return 0;
2217 }
2218
2219 void
2220 kibnal_check_conns (int idx)
2221 {
2222         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2223         struct list_head  *ptmp;
2224         kib_peer_t        *peer;
2225         kib_conn_t        *conn;
2226         struct list_head  *ctmp;
2227
2228  again:
2229         /* NB. We expect to have a look at all the peers and not find any
2230          * rdmas to time out, so we just use a shared lock while we
2231          * take a look... */
2232         read_lock (&kibnal_data.kib_global_lock);
2233
2234         list_for_each (ptmp, peers) {
2235                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2236
2237                 list_for_each (ctmp, &peer->ibp_conns) {
2238                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2239
2240                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2241
2242
2243                         /* In case we have enough credits to return via a
2244                          * NOOP, but there were no non-blocking tx descs
2245                          * free to do it last time... */
2246                         kibnal_check_sends(conn);
2247
2248                         if (!kibnal_conn_timed_out(conn))
2249                                 continue;
2250                         
2251                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2252                                conn, conn->ibc_state, peer->ibp_nid,
2253                                atomic_read (&conn->ibc_refcount));
2254
2255                         atomic_inc (&conn->ibc_refcount);
2256                         read_unlock (&kibnal_data.kib_global_lock);
2257
2258                         CERROR("Timed out RDMA with "LPX64"\n",
2259                                peer->ibp_nid);
2260
2261                         kibnal_close_conn (conn, -ETIMEDOUT);
2262                         kibnal_put_conn (conn);
2263
2264                         /* start again now I've dropped the lock */
2265                         goto again;
2266                 }
2267         }
2268
2269         read_unlock (&kibnal_data.kib_global_lock);
2270 }
2271
2272 void
2273 kibnal_terminate_conn (kib_conn_t *conn)
2274 {
2275         int           rc;
2276
2277         CDEBUG(D_NET, "conn %p\n", conn);
2278         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2279         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2280
2281         rc = ib_cm_disconnect (conn->ibc_comm_id);
2282         if (rc != 0)
2283                 CERROR ("Error %d disconnecting conn %p -> "LPX64"\n",
2284                         rc, conn, conn->ibc_peer->ibp_nid);
2285 }
2286
2287 int
2288 kibnal_reaper (void *arg)
2289 {
2290         wait_queue_t       wait;
2291         unsigned long      flags;
2292         kib_conn_t        *conn;
2293         int                timeout;
2294         int                i;
2295         int                peer_index = 0;
2296         unsigned long      deadline = jiffies;
2297         
2298         kportal_daemonize ("kibnal_reaper");
2299         kportal_blockallsigs ();
2300
2301         init_waitqueue_entry (&wait, current);
2302
2303         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2304
2305         while (!kibnal_data.kib_shutdown) {
2306                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2307                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2308                                            kib_conn_t, ibc_list);
2309                         list_del (&conn->ibc_list);
2310                         
2311                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2312
2313                         switch (conn->ibc_state) {
2314                         case IBNAL_CONN_DEATHROW:
2315                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2316                                 /* Disconnect: conn becomes a zombie in the
2317                                  * callback and last ref reschedules it
2318                                  * here... */
2319                                 kibnal_terminate_conn(conn);
2320                                 kibnal_put_conn (conn);
2321                                 break;
2322                                 
2323                         case IBNAL_CONN_ZOMBIE:
2324                                 kibnal_destroy_conn (conn);
2325                                 break;
2326                                 
2327                         default:
2328                                 CERROR ("Bad conn %p state: %d\n",
2329                                         conn, conn->ibc_state);
2330                                 LBUG();
2331                         }
2332
2333                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2334                         continue;
2335                 }
2336
2337                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2338
2339                 /* careful with the jiffy wrap... */
2340                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2341                         const int n = 4;
2342                         const int p = 1;
2343                         int       chunk = kibnal_data.kib_peer_hash_size;
2344                         
2345                         /* Time to check for RDMA timeouts on a few more
2346                          * peers: I do checks every 'p' seconds on a
2347                          * proportion of the peer table and I need to check
2348                          * every connection 'n' times within a timeout
2349                          * interval, to ensure I detect a timeout on any
2350                          * connection within (n+1)/n times the timeout
2351                          * interval. */
2352
2353                         if (kibnal_tunables.kib_io_timeout > n * p)
2354                                 chunk = (chunk * n * p) / 
2355                                         kibnal_tunables.kib_io_timeout;
2356                         if (chunk == 0)
2357                                 chunk = 1;
2358
2359                         for (i = 0; i < chunk; i++) {
2360                                 kibnal_check_conns (peer_index);
2361                                 peer_index = (peer_index + 1) % 
2362                                              kibnal_data.kib_peer_hash_size;
2363                         }
2364
2365                         deadline += p * HZ;
2366                 }
2367
2368                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2369
2370                 set_current_state (TASK_INTERRUPTIBLE);
2371                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2372
2373                 schedule_timeout (timeout);
2374
2375                 set_current_state (TASK_RUNNING);
2376                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2377
2378                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2379         }
2380
2381         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2382
2383         kibnal_thread_fini ();
2384         return (0);
2385 }
2386
2387 int
2388 kibnal_connd (void *arg)
2389 {
2390         long               id = (long)arg;
2391         char               name[16];
2392         wait_queue_t       wait;
2393         unsigned long      flags;
2394         kib_peer_t        *peer;
2395         kib_acceptsock_t  *as;
2396         int                did_something;
2397
2398         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2399         kportal_daemonize(name);
2400         kportal_blockallsigs();
2401
2402         init_waitqueue_entry (&wait, current);
2403
2404         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2405
2406         while (!kibnal_data.kib_shutdown) {
2407                 did_something = 0;
2408
2409                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2410                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2411                                          kib_acceptsock_t, ibas_list);
2412                         list_del (&as->ibas_list);
2413                         
2414                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2415
2416                         kibnal_handle_svcqry(as->ibas_sock);
2417                         sock_release(as->ibas_sock);
2418                         PORTAL_FREE(as, sizeof(*as));
2419                         
2420                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2421                         did_something = 1;
2422                 }
2423                         
2424                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2425                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2426                                            kib_peer_t, ibp_connd_list);
2427                         
2428                         list_del_init (&peer->ibp_connd_list);
2429                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2430
2431                         kibnal_connect_peer (peer);
2432                         kibnal_put_peer (peer);
2433
2434                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2435                         did_something = 1;
2436                 }
2437
2438                 if (did_something)
2439                         continue;
2440
2441                 set_current_state (TASK_INTERRUPTIBLE);
2442                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2443
2444                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2445
2446                 schedule();
2447
2448                 set_current_state (TASK_RUNNING);
2449                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2450
2451                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2452         }
2453
2454         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2455
2456         kibnal_thread_fini ();
2457         return (0);
2458 }
2459
2460 int
2461 kibnal_scheduler(void *arg)
2462 {
2463         long            id = (long)arg;
2464         char            name[16];
2465         kib_rx_t       *rx;
2466         kib_tx_t       *tx;
2467         unsigned long   flags;
2468         int             rc;
2469         int             counter = 0;
2470         int             did_something;
2471
2472         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2473         kportal_daemonize(name);
2474         kportal_blockallsigs();
2475
2476         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2477
2478         while (!kibnal_data.kib_shutdown) {
2479                 did_something = 0;
2480
2481                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2482                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2483                                         kib_tx_t, tx_list);
2484                         list_del(&tx->tx_list);
2485                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2486                                                flags);
2487                         kibnal_tx_done(tx);
2488
2489                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2490                                           flags);
2491                 }
2492
2493                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2494                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2495                                         kib_rx_t, rx_list);
2496                         list_del(&rx->rx_list);
2497                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2498                                                flags);
2499
2500                         kibnal_rx(rx);
2501
2502                         did_something = 1;
2503                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2504                                           flags);
2505                 }
2506
2507                 /* nothing to do or hogging CPU */
2508                 if (!did_something || counter++ == IBNAL_RESCHED) {
2509                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2510                                                flags);
2511                         counter = 0;
2512
2513                         if (!did_something) {
2514                                 rc = wait_event_interruptible(
2515                                         kibnal_data.kib_sched_waitq,
2516                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2517                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2518                                         kibnal_data.kib_shutdown);
2519                         } else {
2520                                 our_cond_resched();
2521                         }
2522
2523                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2524                                           flags);
2525                 }
2526         }
2527
2528         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2529
2530         kibnal_thread_fini();
2531         return (0);
2532 }
2533
2534
2535 lib_nal_t kibnal_lib = {
2536         libnal_data:        &kibnal_data,      /* NAL private data */
2537         libnal_send:         kibnal_send,
2538         libnal_send_pages:   kibnal_send_pages,
2539         libnal_recv:         kibnal_recv,
2540         libnal_recv_pages:   kibnal_recv_pages,
2541         libnal_dist:         kibnal_dist
2542 };