Whamcloud - gitweb
* fix for 5722 openibnal: NULL dereference in kibnal_close_conn_locked
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openibnal.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
47         unsigned long    flags;
48         int              i;
49         int              rc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         switch (tx->tx_mapped) {
55         default:
56                 LBUG();
57
58         case KIB_TX_UNMAPPED:
59                 break;
60                 
61         case KIB_TX_MAPPED:
62                 if (in_interrupt()) {
63                         /* can't deregister memory in IRQ context... */
64                         kibnal_schedule_tx_done(tx);
65                         return;
66                 }
67                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
68                 LASSERT (rc == 0);
69                 tx->tx_mapped = KIB_TX_UNMAPPED;
70                 break;
71
72 #if IBNAL_FMR
73         case KIB_TX_MAPPED_FMR:
74                 if (in_interrupt() && tx->tx_status != 0) {
75                         /* can't flush FMRs in IRQ context... */
76                         kibnal_schedule_tx_done(tx);
77                         return;
78                 }              
79
80                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
81                 LASSERT (rc == 0);
82
83                 if (tx->tx_status != 0)
84                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
85                 tx->tx_mapped = KIB_TX_UNMAPPED;
86                 break;
87 #endif
88         }
89
90         for (i = 0; i < 2; i++) {
91                 /* tx may have up to 2 libmsgs to finalise */
92                 if (tx->tx_libmsg[i] == NULL)
93                         continue;
94
95                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96                 tx->tx_libmsg[i] = NULL;
97         }
98         
99         if (tx->tx_conn != NULL) {
100                 kibnal_put_conn (tx->tx_conn);
101                 tx->tx_conn = NULL;
102         }
103
104         tx->tx_nsp = 0;
105         tx->tx_passive_rdma = 0;
106         tx->tx_status = 0;
107
108         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
109
110         if (tx->tx_isnblk) {
111                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
112         } else {
113                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
114                 wake_up (&kibnal_data.kib_idle_tx_waitq);
115         }
116
117         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
118 }
119
120 kib_tx_t *
121 kibnal_get_idle_tx (int may_block) 
122 {
123         unsigned long  flags;
124         kib_tx_t      *tx = NULL;
125         
126         for (;;) {
127                 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
128
129                 /* "normal" descriptor is free */
130                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
131                         tx = list_entry (kibnal_data.kib_idle_txs.next,
132                                          kib_tx_t, tx_list);
133                         break;
134                 }
135
136                 if (!may_block) {
137                         /* may dip into reserve pool */
138                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
139                                 CERROR ("reserved tx desc pool exhausted\n");
140                                 break;
141                         }
142
143                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
144                                          kib_tx_t, tx_list);
145                         break;
146                 }
147
148                 /* block for idle tx */
149                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
150
151                 wait_event (kibnal_data.kib_idle_tx_waitq,
152                             !list_empty (&kibnal_data.kib_idle_txs) ||
153                             kibnal_data.kib_shutdown);
154         }
155
156         if (tx != NULL) {
157                 list_del (&tx->tx_list);
158
159                 /* Allocate a new passive RDMA completion cookie.  It might
160                  * not be needed, but we've got a lock right now and we're
161                  * unlikely to wrap... */
162                 tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
163
164                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
165                 LASSERT (tx->tx_nsp == 0);
166                 LASSERT (tx->tx_sending == 0);
167                 LASSERT (tx->tx_status == 0);
168                 LASSERT (tx->tx_conn == NULL);
169                 LASSERT (!tx->tx_passive_rdma);
170                 LASSERT (!tx->tx_passive_rdma_wait);
171                 LASSERT (tx->tx_libmsg[0] == NULL);
172                 LASSERT (tx->tx_libmsg[1] == NULL);
173         }
174
175         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
176         
177         return (tx);
178 }
179
180 int
181 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
182 {
183         /* I would guess that if kibnal_get_peer (nid) == NULL,
184            and we're not routing, then 'nid' is very distant :) */
185         if ( nal->libnal_ni.ni_pid.nid == nid ) {
186                 *dist = 0;
187         } else {
188                 *dist = 1;
189         }
190
191         return 0;
192 }
193
194 void
195 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
196 {
197         struct list_head *ttmp;
198         unsigned long     flags;
199         int               idle;
200
201         spin_lock_irqsave (&conn->ibc_lock, flags);
202
203         list_for_each (ttmp, &conn->ibc_active_txs) {
204                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
205
206                 LASSERT (tx->tx_passive_rdma ||
207                          !tx->tx_passive_rdma_wait);
208
209                 LASSERT (tx->tx_passive_rdma_wait ||
210                          tx->tx_sending != 0);
211
212                 if (!tx->tx_passive_rdma_wait ||
213                     tx->tx_passive_rdma_cookie != cookie)
214                         continue;
215
216                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
217
218                 tx->tx_status = status;
219                 tx->tx_passive_rdma_wait = 0;
220                 idle = (tx->tx_sending == 0);
221
222                 if (idle)
223                         list_del (&tx->tx_list);
224
225                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
226
227                 /* I could be racing with tx callbacks.  It's whoever
228                  * _makes_ tx idle that frees it */
229                 if (idle)
230                         kibnal_tx_done (tx);
231                 return;
232         }
233                 
234         spin_unlock_irqrestore (&conn->ibc_lock, flags);
235
236         CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
237                 cookie, conn->ibc_peer->ibp_nid);
238 }
239
240 void
241 kibnal_post_rx (kib_rx_t *rx, int do_credits)
242 {
243         kib_conn_t   *conn = rx->rx_conn;
244         int           rc;
245         unsigned long flags;
246
247         rx->rx_gl = (struct ib_gather_scatter) {
248                 .address = rx->rx_vaddr,
249                 .length  = IBNAL_MSG_SIZE,
250                 .key     = conn->ibc_rx_pages->ibp_lkey,
251         };
252
253         rx->rx_sp = (struct ib_receive_param) {
254                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
255                 .scatter_list           = &rx->rx_gl,
256                 .num_scatter_entries    = 1,
257                 .device_specific        = NULL,
258                 .signaled               = 1,
259         };
260
261         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
262         LASSERT (!rx->rx_posted);
263         rx->rx_posted = 1;
264         mb();
265
266         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
267                 rc = -ECONNABORTED;
268         else
269                 rc = ib_receive (conn->ibc_qp, &rx->rx_sp, 1);
270
271         if (rc == 0) {
272                 if (do_credits) {
273                         spin_lock_irqsave(&conn->ibc_lock, flags);
274                         conn->ibc_outstanding_credits++;
275                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
276
277                         kibnal_check_sends(conn);
278                 }
279                 return;
280         }
281
282         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
283                 CERROR ("Error posting receive -> "LPX64": %d\n",
284                         conn->ibc_peer->ibp_nid, rc);
285                 kibnal_close_conn (rx->rx_conn, rc);
286         } else {
287                 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
288                         conn->ibc_peer->ibp_nid, rc);
289         }
290
291         /* Drop rx's ref */
292         kibnal_put_conn (conn);
293 }
294
295 void
296 kibnal_rx_callback (struct ib_cq_entry *e)
297 {
298         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
299         kib_msg_t    *msg = rx->rx_msg;
300         kib_conn_t   *conn = rx->rx_conn;
301         int           credits;
302         unsigned long flags;
303         int           rc;
304
305         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
306         LASSERT (rx->rx_posted);
307         rx->rx_posted = 0;
308         mb();
309
310         /* receives complete with error in any case after we've started
311          * closing the QP */
312         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
313                 goto failed;
314
315         /* We don't post receives until the conn is established */
316         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
317
318         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
319                 CERROR("Rx from "LPX64" failed: %d\n", 
320                        conn->ibc_peer->ibp_nid, e->status);
321                 goto failed;
322         }
323
324         rc = kibnal_unpack_msg(msg, e->bytes_transferred);
325         if (rc != 0) {
326                 CERROR ("Error %d unpacking rx from "LPX64"\n",
327                         rc, conn->ibc_peer->ibp_nid);
328                 goto failed;
329         }
330
331         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
332             msg->ibm_srcstamp != conn->ibc_incarnation ||
333             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
334             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
335                 CERROR ("Stale rx from "LPX64"\n",
336                         conn->ibc_peer->ibp_nid);
337                 goto failed;
338         }
339
340         /* Have I received credits that will let me send? */
341         credits = msg->ibm_credits;
342         if (credits != 0) {
343                 spin_lock_irqsave(&conn->ibc_lock, flags);
344                 conn->ibc_credits += credits;
345                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
346                 
347                 kibnal_check_sends(conn);
348         }
349
350         switch (msg->ibm_type) {
351         case IBNAL_MSG_NOOP:
352                 kibnal_post_rx (rx, 1);
353                 return;
354
355         case IBNAL_MSG_IMMEDIATE:
356                 break;
357                 
358         case IBNAL_MSG_PUT_RDMA:
359         case IBNAL_MSG_GET_RDMA:
360                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
361                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
362                        msg->ibm_u.rdma.ibrm_desc.rd_key,
363                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
364                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
365                 break;
366                 
367         case IBNAL_MSG_PUT_DONE:
368         case IBNAL_MSG_GET_DONE:
369                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
370                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
371                        msg->ibm_u.completion.ibcm_status);
372
373                 kibnal_complete_passive_rdma (conn, 
374                                               msg->ibm_u.completion.ibcm_cookie,
375                                               msg->ibm_u.completion.ibcm_status);
376                 kibnal_post_rx (rx, 1);
377                 return;
378                         
379         default:
380                 CERROR ("Bad msg type %x from "LPX64"\n",
381                         msg->ibm_type, conn->ibc_peer->ibp_nid);
382                 goto failed;
383         }
384
385         /* schedule for kibnal_rx() in thread context */
386         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
387         
388         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
389         wake_up (&kibnal_data.kib_sched_waitq);
390         
391         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
392         return;
393         
394  failed:
395         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
396         kibnal_close_conn(conn, -ECONNABORTED);
397
398         /* Don't re-post rx & drop its ref on conn */
399         kibnal_put_conn(conn);
400 }
401
402 void
403 kibnal_rx (kib_rx_t *rx)
404 {
405         kib_msg_t   *msg = rx->rx_msg;
406
407         /* Clear flag so I can detect if I've sent an RDMA completion */
408         rx->rx_rdma = 0;
409
410         switch (msg->ibm_type) {
411         case IBNAL_MSG_GET_RDMA:
412                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
413                 /* If the incoming get was matched, I'll have initiated the
414                  * RDMA and the completion message... */
415                 if (rx->rx_rdma)
416                         break;
417
418                 /* Otherwise, I'll send a failed completion now to prevent
419                  * the peer's GET blocking for the full timeout. */
420                 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
421                         rx->rx_conn->ibc_peer->ibp_nid);
422                 kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
423                                           rx, NULL, 0, NULL, NULL, 0, 0);
424                 break;
425                 
426         case IBNAL_MSG_PUT_RDMA:
427                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
428                 if (rx->rx_rdma)
429                         break;
430                 /* This is most unusual, since even if lib_parse() didn't
431                  * match anything, it should have asked us to read (and
432                  * discard) the payload.  The portals header must be
433                  * inconsistent with this message type, so it's the
434                  * sender's fault for sending garbage and she can time
435                  * herself out... */
436                 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
437                         rx->rx_conn->ibc_peer->ibp_nid);
438                 break;
439
440         case IBNAL_MSG_IMMEDIATE:
441                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
442                 LASSERT (!rx->rx_rdma);
443                 break;
444                 
445         default:
446                 LBUG();
447                 break;
448         }
449
450         kibnal_post_rx (rx, 1);
451 }
452
453 #if 0
454 int
455 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
456 {
457         struct page *page;
458
459         if (vaddr >= VMALLOC_START &&
460             vaddr < VMALLOC_END)
461                 page = vmalloc_to_page ((void *)vaddr);
462 #if CONFIG_HIGHMEM
463         else if (vaddr >= PKMAP_BASE &&
464                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
465                 page = vmalloc_to_page ((void *)vaddr);
466         /* in 2.4 ^ just walks the page tables */
467 #endif
468         else
469                 page = virt_to_page (vaddr);
470
471         if (page == NULL ||
472             !VALID_PAGE (page))
473                 return (-EFAULT);
474
475         *physp = kibnal_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
476         return (0);
477 }
478 #endif
479
480 int
481 kibnal_map_iov (kib_tx_t *tx, enum ib_memory_access access,
482                  int niov, struct iovec *iov, int offset, int nob)
483                  
484 {
485         void   *vaddr;
486         int     rc;
487
488         LASSERT (nob > 0);
489         LASSERT (niov > 0);
490         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
491
492         while (offset >= iov->iov_len) {
493                 offset -= iov->iov_len;
494                 niov--;
495                 iov++;
496                 LASSERT (niov > 0);
497         }
498
499         if (nob > iov->iov_len - offset) {
500                 CERROR ("Can't map multiple vaddr fragments\n");
501                 return (-EMSGSIZE);
502         }
503
504         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
505         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
506
507         rc = ib_memory_register (kibnal_data.kib_pd,
508                                  vaddr, nob,
509                                  access,
510                                  &tx->tx_md.md_handle.mr,
511                                  &tx->tx_md.md_lkey,
512                                  &tx->tx_md.md_rkey);
513         
514         if (rc != 0) {
515                 CERROR ("Can't map vaddr: %d\n", rc);
516                 return (rc);
517         }
518
519         tx->tx_mapped = KIB_TX_MAPPED;
520         return (0);
521 }
522
523 int
524 kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access,
525                   int nkiov, ptl_kiov_t *kiov,
526                   int offset, int nob)
527 {
528 #if IBNAL_FMR
529         __u64                      *phys;
530         const int                   mapped = KIB_TX_MAPPED_FMR;
531 #else
532         struct ib_physical_buffer  *phys;
533         const int                   mapped = KIB_TX_MAPPED;
534 #endif
535         int                         page_offset;
536         int                         nphys;
537         int                         resid;
538         int                         phys_size;
539         int                         rc;
540
541         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
542
543         LASSERT (nob > 0);
544         LASSERT (nkiov > 0);
545         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
546
547         while (offset >= kiov->kiov_len) {
548                 offset -= kiov->kiov_len;
549                 nkiov--;
550                 kiov++;
551                 LASSERT (nkiov > 0);
552         }
553
554         phys_size = nkiov * sizeof (*phys);
555         PORTAL_ALLOC(phys, phys_size);
556         if (phys == NULL) {
557                 CERROR ("Can't allocate tmp phys\n");
558                 return (-ENOMEM);
559         }
560
561         page_offset = kiov->kiov_offset + offset;
562 #if IBNAL_FMR
563         phys[0] = kibnal_page2phys(kiov->kiov_page);
564 #else
565         phys[0].address = kibnal_page2phys(kiov->kiov_page);
566         phys[0].size = PAGE_SIZE;
567 #endif
568         nphys = 1;
569         resid = nob - (kiov->kiov_len - offset);
570
571         while (resid > 0) {
572                 kiov++;
573                 nkiov--;
574                 LASSERT (nkiov > 0);
575
576                 if (kiov->kiov_offset != 0 ||
577                     ((resid > PAGE_SIZE) && 
578                      kiov->kiov_len < PAGE_SIZE)) {
579                         int i;
580                         /* Can't have gaps */
581                         CERROR ("Can't make payload contiguous in I/O VM:"
582                                 "page %d, offset %d, len %d \n", nphys, 
583                                 kiov->kiov_offset, kiov->kiov_len);
584
585                         for (i = -nphys; i < nkiov; i++) 
586                         {
587                                 CERROR("kiov[%d] %p +%d for %d\n",
588                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
589                         }
590                         
591                         rc = -EINVAL;
592                         goto out;
593                 }
594
595                 if (nphys == PTL_MD_MAX_IOV) {
596                         CERROR ("payload too big (%d)\n", nphys);
597                         rc = -EMSGSIZE;
598                         goto out;
599                 }
600
601                 LASSERT (nphys * sizeof (*phys) < phys_size);
602 #if IBNAL_FMR
603                 phys[nphys] = kibnal_page2phys(kiov->kiov_page);
604 #else
605                 phys[nphys].address = kibnal_page2phys(kiov->kiov_page);
606                 phys[nphys].size = PAGE_SIZE;
607 #endif
608                 nphys++;
609
610                 resid -= PAGE_SIZE;
611         }
612
613         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
614
615 #if IBNAL_FMR
616         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
617                                        phys, nphys,
618                                        &tx->tx_md.md_addr,
619                                        page_offset,
620                                        &tx->tx_md.md_handle.fmr,
621                                        &tx->tx_md.md_lkey,
622                                        &tx->tx_md.md_rkey);
623 #else
624         rc = ib_memory_register_physical (kibnal_data.kib_pd,
625                                           phys, nphys,
626                                           &tx->tx_md.md_addr,
627                                           nob, page_offset,
628                                           access,
629                                           &tx->tx_md.md_handle.mr,
630                                           &tx->tx_md.md_lkey,
631                                           &tx->tx_md.md_rkey);
632 #endif
633         if (rc == 0) {
634                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
635                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
636                 tx->tx_mapped = mapped;
637         } else {
638                 CERROR ("Can't map phys: %d\n", rc);
639                 rc = -EFAULT;
640         }
641
642  out:
643         PORTAL_FREE(phys, phys_size);
644         return (rc);
645 }
646
647 kib_conn_t *
648 kibnal_find_conn_locked (kib_peer_t *peer)
649 {
650         struct list_head *tmp;
651
652         /* just return the first connection */
653         list_for_each (tmp, &peer->ibp_conns) {
654                 return (list_entry(tmp, kib_conn_t, ibc_list));
655         }
656
657         return (NULL);
658 }
659
660 void
661 kibnal_check_sends (kib_conn_t *conn)
662 {
663         unsigned long   flags;
664         kib_tx_t       *tx;
665         int             rc;
666         int             i;
667         int             done;
668         int             nwork;
669
670         spin_lock_irqsave (&conn->ibc_lock, flags);
671
672         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
673
674         if (list_empty(&conn->ibc_tx_queue) &&
675             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
676                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
677                 
678                 tx = kibnal_get_idle_tx(0);     /* don't block */
679                 if (tx != NULL)
680                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
681
682                 spin_lock_irqsave(&conn->ibc_lock, flags);
683                 
684                 if (tx != NULL) {
685                         atomic_inc(&conn->ibc_refcount);
686                         kibnal_queue_tx_locked(tx, conn);
687                 }
688         }
689
690         while (!list_empty (&conn->ibc_tx_queue)) {
691                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
692
693                 /* We rely on this for QP sizing */
694                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
695
696                 LASSERT (conn->ibc_outstanding_credits >= 0);
697                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
698                 LASSERT (conn->ibc_credits >= 0);
699                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
700
701                 /* Not on ibc_rdma_queue */
702                 LASSERT (!tx->tx_passive_rdma_wait);
703
704                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
705                         break;
706
707                 if (conn->ibc_credits == 0)     /* no credits */
708                         break;
709                 
710                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
711                     conn->ibc_outstanding_credits == 0) /* giving back credits */
712                         break;
713
714                 list_del (&tx->tx_list);
715
716                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
717                     (!list_empty(&conn->ibc_tx_queue) ||
718                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
719                         /* redundant NOOP */
720                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
721                         kibnal_tx_done(tx);
722                         spin_lock_irqsave(&conn->ibc_lock, flags);
723                         continue;
724                 }
725
726                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
727                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
728
729                 conn->ibc_outstanding_credits = 0;
730                 conn->ibc_nsends_posted++;
731                 conn->ibc_credits--;
732
733                 tx->tx_sending = tx->tx_nsp;
734                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
735                 list_add (&tx->tx_list, &conn->ibc_active_txs);
736
737                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
738
739                 /* NB the gap between removing tx from the queue and sending it
740                  * allows message re-ordering to occur */
741
742                 LASSERT (tx->tx_nsp > 0);
743
744                 rc = -ECONNABORTED;
745                 nwork = 0;
746                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
747                         tx->tx_status = 0;
748                         /* Driver only accepts 1 item at a time */
749                         for (i = 0; i < tx->tx_nsp; i++) {
750                                 rc = ib_send (conn->ibc_qp, &tx->tx_sp[i], 1);
751                                 if (rc != 0)
752                                         break;
753                                 nwork++;
754                         }
755                 }
756
757                 spin_lock_irqsave (&conn->ibc_lock, flags);
758                 if (rc != 0) {
759                         /* NB credits are transferred in the actual
760                          * message, which can only be the last work item */
761                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
762                         conn->ibc_credits++;
763                         conn->ibc_nsends_posted--;
764
765                         tx->tx_status = rc;
766                         tx->tx_passive_rdma_wait = 0;
767                         tx->tx_sending -= tx->tx_nsp - nwork;
768
769                         done = (tx->tx_sending == 0);
770                         if (done)
771                                 list_del (&tx->tx_list);
772                         
773                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
774                         
775                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
776                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
777                                         rc, conn->ibc_peer->ibp_nid);
778                         else
779                                 CDEBUG (D_NET, "Error %d posting transmit to "
780                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
781
782                         kibnal_close_conn (conn, rc);
783
784                         if (done)
785                                 kibnal_tx_done (tx);
786                         return;
787                 }
788                 
789         }
790
791         spin_unlock_irqrestore (&conn->ibc_lock, flags);
792 }
793
794 void
795 kibnal_tx_callback (struct ib_cq_entry *e)
796 {
797         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
798         kib_conn_t   *conn;
799         unsigned long flags;
800         int           idle;
801
802         conn = tx->tx_conn;
803         LASSERT (conn != NULL);
804         LASSERT (tx->tx_sending != 0);
805
806         spin_lock_irqsave(&conn->ibc_lock, flags);
807
808         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
809                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
810                e->status);
811
812         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
813          * gets to free it, which also drops its ref on 'conn'.  If it's
814          * not me, then I take an extra ref on conn so it can't disappear
815          * under me. */
816
817         tx->tx_sending--;
818         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
819                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
820         if (idle)
821                 list_del(&tx->tx_list);
822
823         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
824                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
825                atomic_read (&conn->ibc_refcount));
826         atomic_inc (&conn->ibc_refcount);
827
828         if (tx->tx_sending == 0)
829                 conn->ibc_nsends_posted--;
830
831         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
832             tx->tx_status == 0)
833                 tx->tx_status = -ECONNABORTED;
834                 
835         spin_unlock_irqrestore(&conn->ibc_lock, flags);
836
837         if (idle)
838                 kibnal_tx_done (tx);
839
840         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
841                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
842                         conn->ibc_peer->ibp_nid, e->status);
843                 kibnal_close_conn (conn, -ENETDOWN);
844         } else {
845                 /* can I shovel some more sends out the door? */
846                 kibnal_check_sends(conn);
847         }
848
849         kibnal_put_conn (conn);
850 }
851
852 void
853 kibnal_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
854 {
855         if (kibnal_wreqid_is_rx(e->work_request_id))
856                 kibnal_rx_callback (e);
857         else
858                 kibnal_tx_callback (e);
859 }
860
861 void
862 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
863 {
864         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
865         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
866         int                       fence;
867         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
868
869         LASSERT (tx->tx_nsp >= 0 && 
870                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
871         LASSERT (nob <= IBNAL_MSG_SIZE);
872
873         kibnal_init_msg(tx->tx_msg, type, body_nob);
874
875         /* Fence the message if it's bundled with an RDMA read */
876         fence = (tx->tx_nsp > 0) &&
877                 (type == IBNAL_MSG_PUT_DONE);
878
879         *gl = (struct ib_gather_scatter) {
880                 .address = tx->tx_vaddr,
881                 .length  = nob,
882                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
883         };
884
885         /* NB If this is an RDMA read, the completion message must wait for
886          * the RDMA to complete.  Sends wait for previous RDMA writes
887          * anyway... */
888         *sp = (struct ib_send_param) {
889                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
890                 .op                   = IB_OP_SEND,
891                 .gather_list          = gl,
892                 .num_gather_entries   = 1,
893                 .device_specific      = NULL,
894                 .solicited_event      = 1,
895                 .signaled             = 1,
896                 .immediate_data_valid = 0,
897                 .fence                = fence,
898                 .inline_data          = 0,
899         };
900
901         tx->tx_nsp++;
902 }
903
904 void
905 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
906 {
907         unsigned long         flags;
908
909         spin_lock_irqsave(&conn->ibc_lock, flags);
910
911         kibnal_queue_tx_locked (tx, conn);
912         
913         spin_unlock_irqrestore(&conn->ibc_lock, flags);
914         
915         kibnal_check_sends(conn);
916 }
917
918 void
919 kibnal_schedule_active_connect_locked (kib_peer_t *peer)
920 {
921         /* Called with exclusive kib_global_lock */
922
923         peer->ibp_connecting++;
924         atomic_inc (&peer->ibp_refcount); /* extra ref for connd */
925         
926         spin_lock (&kibnal_data.kib_connd_lock);
927         
928         LASSERT (list_empty(&peer->ibp_connd_list));
929         list_add_tail (&peer->ibp_connd_list,
930                        &kibnal_data.kib_connd_peers);
931         wake_up (&kibnal_data.kib_connd_waitq);
932         
933         spin_unlock (&kibnal_data.kib_connd_lock);
934 }
935
936 void
937 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
938 {
939         unsigned long    flags;
940         kib_peer_t      *peer;
941         kib_conn_t      *conn;
942         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
943
944         /* If I get here, I've committed to send, so I complete the tx with
945          * failure on any problems */
946         
947         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
948         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
949
950         read_lock_irqsave(g_lock, flags);
951         
952         peer = kibnal_find_peer_locked (nid);
953         if (peer == NULL) {
954                 read_unlock_irqrestore(g_lock, flags);
955                 tx->tx_status = -EHOSTUNREACH;
956                 kibnal_tx_done (tx);
957                 return;
958         }
959
960         conn = kibnal_find_conn_locked (peer);
961         if (conn != NULL) {
962                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
963                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
964                        atomic_read (&conn->ibc_refcount));
965                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
966                 read_unlock_irqrestore(g_lock, flags);
967                 
968                 kibnal_queue_tx (tx, conn);
969                 return;
970         }
971         
972         /* Making one or more connections; I'll need a write lock... */
973         read_unlock(g_lock);
974         write_lock(g_lock);
975
976         peer = kibnal_find_peer_locked (nid);
977         if (peer == NULL) {
978                 write_unlock_irqrestore (g_lock, flags);
979                 tx->tx_status = -EHOSTUNREACH;
980                 kibnal_tx_done (tx);
981                 return;
982         }
983
984         conn = kibnal_find_conn_locked (peer);
985         if (conn != NULL) {
986                 /* Connection exists; queue message on it */
987                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
988                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
989                        atomic_read (&conn->ibc_refcount));
990                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
991                 write_unlock_irqrestore (g_lock, flags);
992                 
993                 kibnal_queue_tx (tx, conn);
994                 return;
995         }
996
997         if (peer->ibp_connecting == 0) {
998                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
999                         write_unlock_irqrestore (g_lock, flags);
1000                         tx->tx_status = -EHOSTUNREACH;
1001                         kibnal_tx_done (tx);
1002                         return;
1003                 }
1004         
1005                 kibnal_schedule_active_connect_locked(peer);
1006         }
1007         
1008         /* A connection is being established; queue the message... */
1009         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1010
1011         write_unlock_irqrestore (g_lock, flags);
1012 }
1013
1014 ptl_err_t
1015 kibnal_start_passive_rdma (int type, ptl_nid_t nid,
1016                             lib_msg_t *libmsg, ptl_hdr_t *hdr)
1017 {
1018         int         nob = libmsg->md->length;
1019         kib_tx_t   *tx;
1020         kib_msg_t  *ibmsg;
1021         int         rc;
1022         int         access;
1023         
1024         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1025                  type == IBNAL_MSG_GET_RDMA);
1026         LASSERT (nob > 0);
1027         LASSERT (!in_interrupt());              /* Mapping could block */
1028
1029         if (type == IBNAL_MSG_PUT_RDMA) {
1030                 access = IB_ACCESS_REMOTE_READ;
1031         } else {
1032                 access = IB_ACCESS_REMOTE_WRITE |
1033                          IB_ACCESS_LOCAL_WRITE;
1034         }
1035
1036         tx = kibnal_get_idle_tx (1);           /* May block; caller is an app thread */
1037         LASSERT (tx != NULL);
1038
1039         if ((libmsg->md->options & PTL_MD_KIOV) == 0) 
1040                 rc = kibnal_map_iov (tx, access,
1041                                      libmsg->md->md_niov,
1042                                      libmsg->md->md_iov.iov,
1043                                      0, nob);
1044         else
1045                 rc = kibnal_map_kiov (tx, access,
1046                                       libmsg->md->md_niov, 
1047                                       libmsg->md->md_iov.kiov,
1048                                       0, nob);
1049
1050         if (rc != 0) {
1051                 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1052                 goto failed;
1053         }
1054         
1055         if (type == IBNAL_MSG_GET_RDMA) {
1056                 /* reply gets finalized when tx completes */
1057                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, 
1058                                                         nid, libmsg);
1059                 if (tx->tx_libmsg[1] == NULL) {
1060                         CERROR ("Can't create reply for GET -> "LPX64"\n",
1061                                 nid);
1062                         rc = -ENOMEM;
1063                         goto failed;
1064                 }
1065         }
1066         
1067         tx->tx_passive_rdma = 1;
1068
1069         ibmsg = tx->tx_msg;
1070
1071         ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
1072         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1073         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1074         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1075         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1076
1077         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1078
1079         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1080                LPX64", nob %d\n",
1081                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1082                tx->tx_md.md_addr, nob);
1083         
1084         /* libmsg gets finalized when tx completes. */
1085         tx->tx_libmsg[0] = libmsg;
1086
1087         kibnal_launch_tx(tx, nid);
1088         return (PTL_OK);
1089
1090  failed:
1091         tx->tx_status = rc;
1092         kibnal_tx_done (tx);
1093         return (PTL_FAIL);
1094 }
1095
1096 void
1097 kibnal_start_active_rdma (int type, int status,
1098                            kib_rx_t *rx, lib_msg_t *libmsg, 
1099                            unsigned int niov,
1100                            struct iovec *iov, ptl_kiov_t *kiov,
1101                            int offset, int nob)
1102 {
1103         kib_msg_t    *rxmsg = rx->rx_msg;
1104         kib_msg_t    *txmsg;
1105         kib_tx_t     *tx;
1106         int           access;
1107         int           rdma_op;
1108         int           rc;
1109
1110         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1111                type, status, niov, offset, nob);
1112
1113         /* Called by scheduler */
1114         LASSERT (!in_interrupt ());
1115
1116         /* Either all pages or all vaddrs */
1117         LASSERT (!(kiov != NULL && iov != NULL));
1118
1119         /* No data if we're completing with failure */
1120         LASSERT (status == 0 || nob == 0);
1121
1122         LASSERT (type == IBNAL_MSG_GET_DONE ||
1123                  type == IBNAL_MSG_PUT_DONE);
1124
1125         /* Flag I'm completing the RDMA.  Even if I fail to send the
1126          * completion message, I will have tried my best so further
1127          * attempts shouldn't be tried. */
1128         LASSERT (!rx->rx_rdma);
1129         rx->rx_rdma = 1;
1130
1131         if (type == IBNAL_MSG_GET_DONE) {
1132                 access   = 0;
1133                 rdma_op  = IB_OP_RDMA_WRITE;
1134                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1135         } else {
1136                 access   = IB_ACCESS_LOCAL_WRITE;
1137                 rdma_op  = IB_OP_RDMA_READ;
1138                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1139         }
1140
1141         tx = kibnal_get_idle_tx (0);           /* Mustn't block */
1142         if (tx == NULL) {
1143                 CERROR ("tx descs exhausted on RDMA from "LPX64
1144                         " completing locally with failure\n",
1145                         rx->rx_conn->ibc_peer->ibp_nid);
1146                 lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1147                 return;
1148         }
1149         LASSERT (tx->tx_nsp == 0);
1150                         
1151         if (nob != 0) {
1152                 /* We actually need to transfer some data (the transfer
1153                  * size could get truncated to zero when the incoming
1154                  * message is matched) */
1155
1156                 if (kiov != NULL)
1157                         rc = kibnal_map_kiov (tx, access,
1158                                               niov, kiov, offset, nob);
1159                 else
1160                         rc = kibnal_map_iov (tx, access,
1161                                              niov, iov, offset, nob);
1162                 
1163                 if (rc != 0) {
1164                         CERROR ("Can't map RDMA -> "LPX64": %d\n", 
1165                                 rx->rx_conn->ibc_peer->ibp_nid, rc);
1166                         /* We'll skip the RDMA and complete with failure. */
1167                         status = rc;
1168                         nob = 0;
1169                 } else {
1170                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1171                                 .address = tx->tx_md.md_addr,
1172                                 .length  = nob,
1173                                 .key     = tx->tx_md.md_lkey,
1174                         };
1175                 
1176                         tx->tx_sp[0] = (struct ib_send_param) {
1177                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1178                                 .op                   = rdma_op,
1179                                 .gather_list          = &tx->tx_gl[0],
1180                                 .num_gather_entries   = 1,
1181                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1182                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1183                                 .device_specific      = NULL,
1184                                 .solicited_event      = 0,
1185                                 .signaled             = 1,
1186                                 .immediate_data_valid = 0,
1187                                 .fence                = 0,
1188                                 .inline_data          = 0,
1189                         };
1190
1191                         tx->tx_nsp = 1;
1192                 }
1193         }
1194
1195         txmsg = tx->tx_msg;
1196
1197         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1198         txmsg->ibm_u.completion.ibcm_status = status;
1199         
1200         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1201
1202         if (status == 0 && nob != 0) {
1203                 LASSERT (tx->tx_nsp > 1);
1204                 /* RDMA: libmsg gets finalized when the tx completes.  This
1205                  * is after the completion message has been sent, which in
1206                  * turn is after the RDMA has finished. */
1207                 tx->tx_libmsg[0] = libmsg;
1208         } else {
1209                 LASSERT (tx->tx_nsp == 1);
1210                 /* No RDMA: local completion happens now! */
1211                 CDEBUG(D_NET, "No data: immediate completion\n");
1212                 lib_finalize (&kibnal_lib, NULL, libmsg,
1213                               status == 0 ? PTL_OK : PTL_FAIL);
1214         }
1215
1216         /* +1 ref for this tx... */
1217         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1218                rx->rx_conn, rx->rx_conn->ibc_state, 
1219                rx->rx_conn->ibc_peer->ibp_nid,
1220                atomic_read (&rx->rx_conn->ibc_refcount));
1221         atomic_inc (&rx->rx_conn->ibc_refcount);
1222         /* ...and queue it up */
1223         kibnal_queue_tx(tx, rx->rx_conn);
1224 }
1225
1226 ptl_err_t
1227 kibnal_sendmsg(lib_nal_t    *nal, 
1228                 void         *private,
1229                 lib_msg_t    *libmsg,
1230                 ptl_hdr_t    *hdr, 
1231                 int           type, 
1232                 ptl_nid_t     nid, 
1233                 ptl_pid_t     pid,
1234                 unsigned int  payload_niov, 
1235                 struct iovec *payload_iov, 
1236                 ptl_kiov_t   *payload_kiov,
1237                 int           payload_offset,
1238                 int           payload_nob)
1239 {
1240         kib_msg_t  *ibmsg;
1241         kib_tx_t   *tx;
1242         int         nob;
1243
1244         /* NB 'private' is different depending on what we're sending.... */
1245
1246         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
1247                payload_nob, payload_niov, nid , pid);
1248
1249         LASSERT (payload_nob == 0 || payload_niov > 0);
1250         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1251
1252         /* Thread context if we're sending payload */
1253         LASSERT (!in_interrupt() || payload_niov == 0);
1254         /* payload is either all vaddrs or all pages */
1255         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1256
1257         switch (type) {
1258         default:
1259                 LBUG();
1260                 return (PTL_FAIL);
1261                 
1262         case PTL_MSG_REPLY: {
1263                 /* reply's 'private' is the incoming receive */
1264                 kib_rx_t *rx = private;
1265
1266                 /* RDMA reply expected? */
1267                 if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
1268                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1269                                                  rx, libmsg, payload_niov, 
1270                                                  payload_iov, payload_kiov,
1271                                                  payload_offset, payload_nob);
1272                         return (PTL_OK);
1273                 }
1274                 
1275                 /* Incoming message consistent with immediate reply? */
1276                 if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
1277                         CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1278                                 nid, rx->rx_msg->ibm_type);
1279                         return (PTL_FAIL);
1280                 }
1281
1282                 /* Will it fit in a message? */
1283                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1284                 if (nob > IBNAL_MSG_SIZE) {
1285                         CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n", 
1286                                nid, payload_nob);
1287                         return (PTL_FAIL);
1288                 }
1289                 break;
1290         }
1291
1292         case PTL_MSG_GET:
1293                 /* might the REPLY message be big enough to need RDMA? */
1294                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1295                 if (nob > IBNAL_MSG_SIZE)
1296                         return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, 
1297                                                           nid, libmsg, hdr));
1298                 break;
1299
1300         case PTL_MSG_ACK:
1301                 LASSERT (payload_nob == 0);
1302                 break;
1303
1304         case PTL_MSG_PUT:
1305                 /* Is the payload big enough to need RDMA? */
1306                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1307                 if (nob > IBNAL_MSG_SIZE)
1308                         return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
1309                                                           nid, libmsg, hdr));
1310                 
1311                 break;
1312         }
1313
1314         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1315                                   type == PTL_MSG_REPLY ||
1316                                   in_interrupt()));
1317         if (tx == NULL) {
1318                 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", 
1319                         type, nid, in_interrupt() ? " (intr)" : "");
1320                 return (PTL_NO_SPACE);
1321         }
1322
1323         ibmsg = tx->tx_msg;
1324         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1325
1326         if (payload_nob > 0) {
1327                 if (payload_kiov != NULL)
1328                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1329                                           payload_niov, payload_kiov,
1330                                           payload_offset, payload_nob);
1331                 else
1332                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1333                                          payload_niov, payload_iov,
1334                                          payload_offset, payload_nob);
1335         }
1336
1337         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1338                             offsetof(kib_immediate_msg_t, 
1339                                      ibim_payload[payload_nob]));
1340
1341         /* libmsg gets finalized when tx completes */
1342         tx->tx_libmsg[0] = libmsg;
1343
1344         kibnal_launch_tx(tx, nid);
1345         return (PTL_OK);
1346 }
1347
1348 ptl_err_t
1349 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1350                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1351                unsigned int payload_niov, struct iovec *payload_iov,
1352                size_t payload_offset, size_t payload_len)
1353 {
1354         return (kibnal_sendmsg(nal, private, cookie,
1355                                hdr, type, nid, pid,
1356                                payload_niov, payload_iov, NULL,
1357                                payload_offset, payload_len));
1358 }
1359
1360 ptl_err_t
1361 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1362                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1363                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1364                      size_t payload_offset, size_t payload_len)
1365 {
1366         return (kibnal_sendmsg(nal, private, cookie,
1367                                hdr, type, nid, pid,
1368                                payload_niov, NULL, payload_kiov,
1369                                payload_offset, payload_len));
1370 }
1371
1372 ptl_err_t
1373 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1374                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1375                  int offset, int mlen, int rlen)
1376 {
1377         kib_rx_t    *rx = private;
1378         kib_msg_t   *rxmsg = rx->rx_msg;
1379         int          msg_nob;
1380         
1381         LASSERT (mlen <= rlen);
1382         LASSERT (!in_interrupt ());
1383         /* Either all pages or all vaddrs */
1384         LASSERT (!(kiov != NULL && iov != NULL));
1385
1386         switch (rxmsg->ibm_type) {
1387         default:
1388                 LBUG();
1389                 return (PTL_FAIL);
1390                 
1391         case IBNAL_MSG_IMMEDIATE:
1392                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1393                 if (msg_nob > IBNAL_MSG_SIZE) {
1394                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1395                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1396                         return (PTL_FAIL);
1397                 }
1398
1399                 if (kiov != NULL)
1400                         lib_copy_buf2kiov(niov, kiov, offset,
1401                                           rxmsg->ibm_u.immediate.ibim_payload,
1402                                           mlen);
1403                 else
1404                         lib_copy_buf2iov(niov, iov, offset,
1405                                          rxmsg->ibm_u.immediate.ibim_payload,
1406                                          mlen);
1407
1408                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1409                 return (PTL_OK);
1410
1411         case IBNAL_MSG_GET_RDMA:
1412                 /* We get called here just to discard any junk after the
1413                  * GET hdr. */
1414                 LASSERT (libmsg == NULL);
1415                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1416                 return (PTL_OK);
1417
1418         case IBNAL_MSG_PUT_RDMA:
1419                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
1420                                           rx, libmsg, 
1421                                           niov, iov, kiov, offset, mlen);
1422                 return (PTL_OK);
1423         }
1424 }
1425
1426 ptl_err_t
1427 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1428               unsigned int niov, struct iovec *iov, 
1429               size_t offset, size_t mlen, size_t rlen)
1430 {
1431         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1432                                 offset, mlen, rlen));
1433 }
1434
1435 ptl_err_t
1436 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1437                      unsigned int niov, ptl_kiov_t *kiov, 
1438                      size_t offset, size_t mlen, size_t rlen)
1439 {
1440         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1441                                 offset, mlen, rlen));
1442 }
1443
1444 int
1445 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1446 {
1447         long    pid = kernel_thread (fn, arg, 0);
1448
1449         if (pid < 0)
1450                 return ((int)pid);
1451
1452         atomic_inc (&kibnal_data.kib_nthreads);
1453         return (0);
1454 }
1455
1456 void
1457 kibnal_thread_fini (void)
1458 {
1459         atomic_dec (&kibnal_data.kib_nthreads);
1460 }
1461
1462 void
1463 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1464 {
1465         /* This just does the immmediate housekeeping, and schedules the
1466          * connection for the reaper to finish off.
1467          * Caller holds kib_global_lock exclusively in irq context */
1468         kib_peer_t   *peer = conn->ibc_peer;
1469
1470         CDEBUG (error == 0 ? D_NET : D_ERROR,
1471                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1472         
1473         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1474                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1475
1476         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1477                 /* kib_reaper_conns takes ibc_list's ref */
1478                 list_del (&conn->ibc_list);
1479         } else {
1480                 /* new ref for kib_reaper_conns */
1481                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1482                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1483                        atomic_read (&conn->ibc_refcount));
1484                 atomic_inc (&conn->ibc_refcount);
1485         }
1486         
1487         if (list_empty (&peer->ibp_conns) &&
1488             peer->ibp_persistence == 0) {
1489                 /* Non-persistent peer with no more conns... */
1490                 kibnal_unlink_peer_locked (peer);
1491         }
1492
1493         conn->ibc_state = IBNAL_CONN_DEATHROW;
1494
1495         /* Schedule conn for closing/destruction */
1496         spin_lock (&kibnal_data.kib_reaper_lock);
1497
1498         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1499         wake_up (&kibnal_data.kib_reaper_waitq);
1500                 
1501         spin_unlock (&kibnal_data.kib_reaper_lock);
1502 }
1503
1504 int
1505 kibnal_close_conn (kib_conn_t *conn, int why)
1506 {
1507         unsigned long     flags;
1508         int               count = 0;
1509
1510         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1511
1512         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1513         
1514         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1515                 count = 1;
1516                 kibnal_close_conn_locked (conn, why);
1517         }
1518         
1519         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1520         return (count);
1521 }
1522
1523 void
1524 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
1525 {
1526         LIST_HEAD        (zombies);
1527         kib_tx_t         *tx;
1528         unsigned long     flags;
1529
1530         LASSERT (rc != 0);
1531         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1532
1533         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1534
1535         LASSERT (peer->ibp_connecting != 0);
1536         peer->ibp_connecting--;
1537
1538         if (peer->ibp_connecting != 0) {
1539                 /* another connection attempt under way... */
1540                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1541                 return;
1542         }
1543
1544         if (list_empty(&peer->ibp_conns)) {
1545                 /* Say when active connection can be re-attempted */
1546                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1547                 /* Increase reconnection interval */
1548                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1549                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1550         
1551                 /* Take peer's blocked blocked transmits; I'll complete
1552                  * them with error */
1553                 while (!list_empty (&peer->ibp_tx_queue)) {
1554                         tx = list_entry (peer->ibp_tx_queue.next,
1555                                          kib_tx_t, tx_list);
1556                         
1557                         list_del (&tx->tx_list);
1558                         list_add_tail (&tx->tx_list, &zombies);
1559                 }
1560                 
1561                 if (kibnal_peer_active(peer) &&
1562                     (peer->ibp_persistence == 0)) {
1563                         /* failed connection attempt on non-persistent peer */
1564                         kibnal_unlink_peer_locked (peer);
1565                 }
1566         } else {
1567                 /* Can't have blocked transmits if there are connections */
1568                 LASSERT (list_empty(&peer->ibp_tx_queue));
1569         }
1570         
1571         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1572
1573         if (!list_empty (&zombies))
1574                 CERROR ("Deleting messages for "LPX64": connection failed\n",
1575                         peer->ibp_nid);
1576
1577         while (!list_empty (&zombies)) {
1578                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1579
1580                 list_del (&tx->tx_list);
1581                 /* complete now */
1582                 tx->tx_status = -EHOSTUNREACH;
1583                 kibnal_tx_done (tx);
1584         }
1585 }
1586
1587 void
1588 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1589 {
1590         int               state = conn->ibc_state;
1591         kib_peer_t       *peer = conn->ibc_peer;
1592         kib_tx_t         *tx;
1593         unsigned long     flags;
1594         int               rc;
1595         int               i;
1596
1597         /* passive connection has no connreq & vice versa */
1598         LASSERT (!active == !(conn->ibc_connreq != NULL));
1599         if (active) {
1600                 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1601                 conn->ibc_connreq = NULL;
1602         }
1603
1604         if (state == IBNAL_CONN_CONNECTING) {
1605                 /* Install common (active/passive) callback for
1606                  * disconnect/idle notification if I got as far as getting
1607                  * a CM comm_id */
1608                 rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1609                                           kibnal_conn_callback, conn);
1610                 LASSERT (rc == 0);
1611         }
1612         
1613         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1614
1615         LASSERT (peer->ibp_connecting != 0);
1616         
1617         if (status == 0) {                         
1618                 /* connection established... */
1619                 LASSERT (state == IBNAL_CONN_CONNECTING);
1620                 if (!kibnal_peer_active(peer)) {
1621                         /* ...but peer deleted meantime */
1622                         status = -ECONNABORTED;
1623                 }
1624         } else {
1625                 LASSERT (state == IBNAL_CONN_INIT_QP ||
1626                          state == IBNAL_CONN_CONNECTING);
1627         }
1628
1629         if (status == 0) {
1630                 /* Everything worked! */
1631
1632                 peer->ibp_connecting--;
1633                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1634
1635                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1636                  * the IB_CM_IDLE callback */
1637                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1638                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1639                        atomic_read (&conn->ibc_refcount));
1640                 atomic_inc (&conn->ibc_refcount);
1641                 list_add (&conn->ibc_list, &peer->ibp_conns);
1642                 
1643                 /* reset reconnect interval for next attempt */
1644                 peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1645
1646                 /* post blocked sends to the new connection */
1647                 spin_lock (&conn->ibc_lock);
1648                 
1649                 while (!list_empty (&peer->ibp_tx_queue)) {
1650                         tx = list_entry (peer->ibp_tx_queue.next, 
1651                                          kib_tx_t, tx_list);
1652                         
1653                         list_del (&tx->tx_list);
1654
1655                         /* +1 ref for each tx */
1656                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1657                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1658                                atomic_read (&conn->ibc_refcount));
1659                         atomic_inc (&conn->ibc_refcount);
1660                         kibnal_queue_tx_locked (tx, conn);
1661                 }
1662                 
1663                 spin_unlock (&conn->ibc_lock);
1664
1665                 /* Nuke any dangling conns from a different peer instance... */
1666                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1667                                                  conn->ibc_incarnation);
1668
1669                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1670
1671                 /* queue up all the receives */
1672                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1673                         /* +1 ref for rx desc */
1674                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1675                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1676                                atomic_read (&conn->ibc_refcount));
1677                         atomic_inc (&conn->ibc_refcount);
1678
1679                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1680                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1681                                conn->ibc_rxs[i].rx_vaddr);
1682
1683                         kibnal_post_rx (&conn->ibc_rxs[i], 0);
1684                 }
1685
1686                 kibnal_check_sends (conn);
1687                 return;
1688         }
1689
1690         /* connection failed */
1691         if (state == IBNAL_CONN_CONNECTING) {
1692                 /* schedule for reaper to close */
1693                 kibnal_close_conn_locked (conn, status);
1694         } else {
1695                 /* Don't have a CM comm_id; just wait for refs to drain */
1696                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1697         } 
1698
1699         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1700
1701         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1702
1703         if (state != IBNAL_CONN_CONNECTING) {
1704                 /* drop caller's ref if we're not waiting for the
1705                  * IB_CM_IDLE callback */
1706                 kibnal_put_conn (conn);
1707         }
1708 }
1709
1710 int
1711 kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1712                kib_msg_t *msg, int nob)
1713 {
1714         kib_conn_t    *conn;
1715         kib_peer_t    *peer;
1716         kib_peer_t    *peer2;
1717         unsigned long  flags;
1718         int            rc;
1719
1720         rc = kibnal_unpack_msg(msg, nob);
1721         if (rc != 0) {
1722                 CERROR("Can't unpack connreq msg: %d\n", rc);
1723                 return -EPROTO;
1724         }
1725
1726         CDEBUG(D_NET, "connreq from "LPX64"\n", msg->ibm_srcnid);
1727
1728         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1729                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
1730                        msg->ibm_type, msg->ibm_srcnid);
1731                 return -EPROTO;
1732         }
1733                 
1734         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1735                 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
1736                        msg->ibm_srcnid, msg->ibm_u.connparams.ibcp_queue_depth, 
1737                        IBNAL_MSG_QUEUE_SIZE);
1738                 return (-EPROTO);
1739         }
1740         
1741         conn = kibnal_create_conn();
1742         if (conn == NULL)
1743                 return (-ENOMEM);
1744
1745         /* assume 'nid' is a new peer */
1746         peer = kibnal_create_peer (msg->ibm_srcnid);
1747         if (peer == NULL) {
1748                 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
1749                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1750                        atomic_read (&conn->ibc_refcount));
1751                 atomic_dec (&conn->ibc_refcount);
1752                 kibnal_destroy_conn(conn);
1753                 return (-ENOMEM);
1754         }
1755         
1756         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1757
1758         /* Check I'm the same instance that gave the connection parameters.  
1759          * NB If my incarnation changes after this, the peer will get nuked and
1760          * we'll spot that when the connection is finally added into the peer's
1761          * connlist */
1762         if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
1763             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1764                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1765                 
1766                 CERROR("Stale connection params from "LPX64"\n",
1767                        msg->ibm_srcnid);
1768                 atomic_dec(&conn->ibc_refcount);
1769                 kibnal_destroy_conn(conn);
1770                 kibnal_put_peer(peer);
1771                 return -ESTALE;
1772         }
1773
1774         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1775         if (peer2 == NULL) {
1776                 /* peer table takes my ref on peer */
1777                 list_add_tail (&peer->ibp_list,
1778                                kibnal_nid2peerlist(msg->ibm_srcnid));
1779         } else {
1780                 kibnal_put_peer (peer);
1781                 peer = peer2;
1782         }
1783
1784         /* +1 ref for conn */
1785         atomic_inc (&peer->ibp_refcount);
1786         peer->ibp_connecting++;
1787
1788         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1789
1790         conn->ibc_peer = peer;
1791         conn->ibc_state = IBNAL_CONN_CONNECTING;
1792         conn->ibc_comm_id = cid;
1793         conn->ibc_incarnation = msg->ibm_srcstamp;
1794         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1795
1796         *connp = conn;
1797         return (0);
1798 }
1799
1800 tTS_IB_CM_CALLBACK_RETURN
1801 kibnal_idle_conn_callback (tTS_IB_CM_EVENT event,
1802                             tTS_IB_CM_COMM_ID cid,
1803                             void *param,
1804                             void *arg)
1805 {
1806         /* Shouldn't ever get a callback after TS_IB_CM_IDLE */
1807         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1808         LBUG ();
1809         return TS_IB_CM_CALLBACK_PROCEED;
1810 }
1811
1812 tTS_IB_CM_CALLBACK_RETURN
1813 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1814                        tTS_IB_CM_COMM_ID cid,
1815                        void *param,
1816                        void *arg)
1817 {
1818         kib_conn_t       *conn = arg;
1819         LIST_HEAD        (zombies); 
1820         struct list_head *tmp;
1821         struct list_head *nxt;
1822         kib_tx_t         *tx;
1823         unsigned long     flags;
1824         int               done;
1825         int               rc;
1826
1827         /* Established Connection Notifier */
1828
1829         switch (event) {
1830         default:
1831                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
1832                        conn, conn->ibc_peer->ibp_nid, event);
1833                 kibnal_close_conn (conn, -ECONNABORTED);
1834                 break;
1835                 
1836         case TS_IB_CM_DISCONNECTED:
1837                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" DISCONNECTED.\n",
1838                        conn, conn->ibc_peer->ibp_nid);
1839                 kibnal_close_conn (conn, 0);
1840                 break;
1841
1842         case TS_IB_CM_IDLE:
1843                 CDEBUG(D_NET, "Connection %p -> "LPX64" IDLE.\n",
1844                        conn, conn->ibc_peer->ibp_nid);
1845                 kibnal_put_conn (conn);        /* Lose CM's ref */
1846
1847                 /* LASSERT (no further callbacks) */
1848                 rc = tsIbCmCallbackModify(cid, 
1849                                           kibnal_idle_conn_callback, conn);
1850                 LASSERT (rc == 0);
1851
1852                 /* NB we wait until the connection has closed before
1853                  * completing outstanding passive RDMAs so we can be sure
1854                  * the network can't touch the mapped memory any more. */
1855
1856                 spin_lock_irqsave (&conn->ibc_lock, flags);
1857
1858                 /* grab passive RDMAs not waiting for the tx callback */
1859                 list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1860                         tx = list_entry (tmp, kib_tx_t, tx_list);
1861
1862                         LASSERT (tx->tx_passive_rdma ||
1863                                  !tx->tx_passive_rdma_wait);
1864
1865                         LASSERT (tx->tx_passive_rdma_wait ||
1866                                  tx->tx_sending != 0);
1867
1868                         /* still waiting for tx callback? */
1869                         if (!tx->tx_passive_rdma_wait)
1870                                 continue;
1871
1872                         tx->tx_status = -ECONNABORTED;
1873                         tx->tx_passive_rdma_wait = 0;
1874                         done = (tx->tx_sending == 0);
1875
1876                         if (!done)
1877                                 continue;
1878
1879                         list_del (&tx->tx_list);
1880                         list_add (&tx->tx_list, &zombies);
1881                 }
1882
1883                 /* grab all blocked transmits */
1884                 list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1885                         tx = list_entry (tmp, kib_tx_t, tx_list);
1886                         
1887                         list_del (&tx->tx_list);
1888                         list_add (&tx->tx_list, &zombies);
1889                 }
1890                 
1891                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
1892
1893                 while (!list_empty(&zombies)) {
1894                         tx = list_entry (zombies.next, kib_tx_t, tx_list);
1895
1896                         list_del(&tx->tx_list);
1897                         kibnal_tx_done (tx);
1898                 }
1899                 break;
1900         }
1901
1902         return TS_IB_CM_CALLBACK_PROCEED;
1903 }
1904
1905 tTS_IB_CM_CALLBACK_RETURN
1906 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1907                                tTS_IB_CM_COMM_ID cid,
1908                                void *param,
1909                                void *arg)
1910 {
1911         kib_conn_t  *conn = arg;
1912         int          rc;
1913         
1914         switch (event) {
1915         default:
1916                 if (conn == NULL) {
1917                         /* no connection yet */
1918                         CERROR ("Unexpected event: %d\n", event);
1919                         return TS_IB_CM_CALLBACK_ABORT;
1920                 }
1921                 
1922                 CERROR ("Unexpected event %p -> "LPX64": %d\n", 
1923                         conn, conn->ibc_peer->ibp_nid, event);
1924                 kibnal_connreq_done (conn, 0, -ECONNABORTED);
1925                 break;
1926                 
1927         case TS_IB_CM_REQ_RECEIVED: {
1928                 struct ib_cm_req_received_param *req = param;
1929                 kib_msg_t                       *msg = req->remote_private_data;
1930
1931                 LASSERT (conn == NULL);
1932
1933                 /* Don't really know srcnid until successful unpack */
1934                 CDEBUG(D_NET, "REQ from ?"LPX64"?\n", msg->ibm_srcnid);
1935
1936                 rc = kibnal_accept(&conn, cid, msg, 
1937                                    req->remote_private_data_len);
1938                 if (rc != 0) {
1939                         CERROR ("Can't accept ?"LPX64"?: %d\n",
1940                                 msg->ibm_srcnid, rc);
1941                         return TS_IB_CM_CALLBACK_ABORT;
1942                 }
1943
1944                 /* update 'arg' for next callback */
1945                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
1946                 LASSERT (rc == 0);
1947
1948                 msg = req->accept_param.reply_private_data;
1949                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
1950                                 sizeof(msg->ibm_u.connparams));
1951
1952                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
1953
1954                 kibnal_pack_msg(msg, 0, 
1955                                 conn->ibc_peer->ibp_nid, 
1956                                 conn->ibc_incarnation);
1957
1958                 req->accept_param.qp                     = conn->ibc_qp;
1959                 req->accept_param.reply_private_data_len = msg->ibm_nob;
1960                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
1961                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
1962                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
1963                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
1964
1965                 CDEBUG(D_NET, "Proceeding\n");
1966                 break;
1967         }
1968
1969         case TS_IB_CM_ESTABLISHED:
1970                 LASSERT (conn != NULL);
1971                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
1972                        conn, conn->ibc_peer->ibp_nid);
1973
1974                 kibnal_connreq_done (conn, 0, 0);
1975                 break;
1976         }
1977
1978         /* NB if the connreq is done, we switch to kibnal_conn_callback */
1979         return TS_IB_CM_CALLBACK_PROCEED;
1980 }
1981
1982 tTS_IB_CM_CALLBACK_RETURN
1983 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
1984                               tTS_IB_CM_COMM_ID cid,
1985                               void *param,
1986                               void *arg)
1987 {
1988         kib_conn_t    *conn = arg;
1989         unsigned long  flags;
1990
1991         switch (event) {
1992         case TS_IB_CM_REP_RECEIVED: {
1993                 struct ib_cm_rep_received_param *rep = param;
1994                 kib_msg_t                       *msg = rep->remote_private_data;
1995                 int                              nob = rep->remote_private_data_len;
1996                 int                              rc;
1997
1998                 rc = kibnal_unpack_msg(msg, nob);
1999                 if (rc != 0) {
2000                         CERROR ("Error %d unpacking conn ack from "LPX64"\n",
2001                                 rc, conn->ibc_peer->ibp_nid);
2002                         kibnal_connreq_done (conn, 1, rc);
2003                         break;
2004                 }
2005
2006                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2007                         CERROR ("Unexpected conn ack type %d from "LPX64"\n",
2008                                 msg->ibm_type, conn->ibc_peer->ibp_nid);
2009                         kibnal_connreq_done (conn, 1, -EPROTO);
2010                         break;
2011                 }
2012
2013                 if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
2014                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2015                     msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2016                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2017                         CERROR("Stale conn ack from "LPX64"\n",
2018                                conn->ibc_peer->ibp_nid);
2019                         kibnal_connreq_done (conn, 1, -ESTALE);
2020                         break;
2021                 }
2022
2023                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2024                         CERROR ("Bad queue depth %d from "LPX64"\n",
2025                                 msg->ibm_u.connparams.ibcp_queue_depth,
2026                                 conn->ibc_peer->ibp_nid);
2027                         kibnal_connreq_done (conn, 1, -EPROTO);
2028                         break;
2029                 }
2030                                 
2031                 CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2032                        conn, conn->ibc_peer->ibp_nid);
2033
2034                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2035                 break;
2036         }
2037
2038         case TS_IB_CM_ESTABLISHED:
2039                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED\n",
2040                        conn, conn->ibc_peer->ibp_nid);
2041
2042                 kibnal_connreq_done (conn, 1, 0);
2043                 break;
2044
2045         case TS_IB_CM_IDLE:
2046                 CERROR("Connection %p -> "LPX64" IDLE\n",
2047                        conn, conn->ibc_peer->ibp_nid);
2048                 /* I assume this connection attempt was rejected because the
2049                  * peer found a stale QP; I'll just try again */
2050                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2051                 kibnal_schedule_active_connect_locked(conn->ibc_peer);
2052                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2053
2054                 /* Back out state change: this conn disengaged from CM */
2055                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2056                 
2057                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2058                 break;
2059
2060         default:
2061                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
2062                        conn, conn->ibc_peer->ibp_nid, event);
2063                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2064                 break;
2065         }
2066
2067         /* NB if the connreq is done, we switch to kibnal_conn_callback */
2068         return TS_IB_CM_CALLBACK_PROCEED;
2069 }
2070
2071 int
2072 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2073                           struct ib_path_record *resp, int remaining,
2074                           void *arg)
2075 {
2076         kib_conn_t *conn = arg;
2077         kib_peer_t *peer = conn->ibc_peer;
2078         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2079
2080         if (status != 0) {
2081                 CERROR ("status %d\n", status);
2082                 kibnal_connreq_done (conn, 1, status);
2083                 goto out;
2084         }
2085
2086         conn->ibc_connreq->cr_path = *resp;
2087
2088         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2089         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2090         kibnal_pack_msg(msg, 0, peer->ibp_nid, conn->ibc_incarnation);
2091
2092         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2093                 .qp                   = conn->ibc_qp,
2094                 .req_private_data     = msg,
2095                 .req_private_data_len = msg->ibm_nob,
2096                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2097                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2098                 .retry_count          = IBNAL_RETRY,
2099                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2100                 .cm_response_timeout  = kibnal_tunables.kib_io_timeout,
2101                 .max_cm_retries       = IBNAL_CM_RETRY,
2102                 .flow_control         = IBNAL_FLOW_CONTROL,
2103         };
2104
2105         /* XXX set timeout just like SDP!!!*/
2106         conn->ibc_connreq->cr_path.packet_life = 13;
2107         
2108         /* Flag I'm getting involved with the CM... */
2109         conn->ibc_state = IBNAL_CONN_CONNECTING;
2110
2111         CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2112                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, peer->ibp_nid);
2113
2114         /* kibnal_connect_callback gets my conn ref */
2115         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2116                                 &conn->ibc_connreq->cr_path, NULL,
2117                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2118                                 kibnal_active_conn_callback, conn,
2119                                 &conn->ibc_comm_id);
2120         if (status != 0) {
2121                 CERROR ("Connect: %d\n", status);
2122                 /* Back out state change: I've not got a CM comm_id yet... */
2123                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2124                 kibnal_connreq_done (conn, 1, status);
2125         }
2126         
2127  out:
2128         /* return non-zero to prevent further callbacks */
2129         return 1;
2130 }
2131
2132 void
2133 kibnal_connect_peer (kib_peer_t *peer)
2134 {
2135         kib_conn_t  *conn;
2136         int          rc;
2137
2138         conn = kibnal_create_conn();
2139         if (conn == NULL) {
2140                 CERROR ("Can't allocate conn\n");
2141                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2142                 return;
2143         }
2144
2145         conn->ibc_peer = peer;
2146         atomic_inc (&peer->ibp_refcount);
2147
2148         PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2149         if (conn->ibc_connreq == NULL) {
2150                 CERROR ("Can't allocate connreq\n");
2151                 kibnal_connreq_done (conn, 1, -ENOMEM);
2152                 return;
2153         }
2154
2155         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2156
2157         rc = kibnal_make_svcqry(conn);
2158         if (rc != 0) {
2159                 kibnal_connreq_done (conn, 1, rc);
2160                 return;
2161         }
2162
2163         rc = ib_cached_gid_get(kibnal_data.kib_device,
2164                                kibnal_data.kib_port, 0,
2165                                conn->ibc_connreq->cr_gid);
2166         LASSERT (rc == 0);
2167
2168         /* kibnal_pathreq_callback gets my conn ref */
2169         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2170                                     kibnal_data.kib_port,
2171                                     conn->ibc_connreq->cr_gid,
2172                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2173                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2174                                     0,
2175                                     kibnal_tunables.kib_io_timeout * HZ,
2176                                     0,
2177                                     kibnal_pathreq_callback, conn, 
2178                                     &conn->ibc_connreq->cr_tid);
2179         if (rc == 0)
2180                 return;
2181
2182         CERROR ("Path record request: %d\n", rc);
2183         kibnal_connreq_done (conn, 1, rc);
2184 }
2185
2186 int
2187 kibnal_conn_timed_out (kib_conn_t *conn)
2188 {
2189         kib_tx_t          *tx;
2190         struct list_head  *ttmp;
2191         unsigned long      flags;
2192
2193         spin_lock_irqsave (&conn->ibc_lock, flags);
2194
2195         list_for_each (ttmp, &conn->ibc_tx_queue) {
2196                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2197
2198                 LASSERT (!tx->tx_passive_rdma_wait);
2199                 LASSERT (tx->tx_sending == 0);
2200
2201                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2202                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2203                         return 1;
2204                 }
2205         }
2206
2207         list_for_each (ttmp, &conn->ibc_active_txs) {
2208                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2209
2210                 LASSERT (tx->tx_passive_rdma ||
2211                          !tx->tx_passive_rdma_wait);
2212
2213                 LASSERT (tx->tx_passive_rdma_wait ||
2214                          tx->tx_sending != 0);
2215
2216                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2217                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2218                         return 1;
2219                 }
2220         }
2221
2222         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2223
2224         return 0;
2225 }
2226
2227 void
2228 kibnal_check_conns (int idx)
2229 {
2230         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2231         struct list_head  *ptmp;
2232         kib_peer_t        *peer;
2233         kib_conn_t        *conn;
2234         struct list_head  *ctmp;
2235         unsigned long      flags;
2236
2237  again:
2238         /* NB. We expect to have a look at all the peers and not find any
2239          * rdmas to time out, so we just use a shared lock while we
2240          * take a look... */
2241         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2242
2243         list_for_each (ptmp, peers) {
2244                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2245
2246                 list_for_each (ctmp, &peer->ibp_conns) {
2247                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2248
2249                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2250
2251
2252                         /* In case we have enough credits to return via a
2253                          * NOOP, but there were no non-blocking tx descs
2254                          * free to do it last time... */
2255                         kibnal_check_sends(conn);
2256
2257                         if (!kibnal_conn_timed_out(conn))
2258                                 continue;
2259                         
2260                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2261                                conn, conn->ibc_state, peer->ibp_nid,
2262                                atomic_read (&conn->ibc_refcount));
2263
2264                         atomic_inc (&conn->ibc_refcount);
2265                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2266                                                flags);
2267
2268                         CERROR("Timed out RDMA with "LPX64"\n",
2269                                peer->ibp_nid);
2270
2271                         kibnal_close_conn (conn, -ETIMEDOUT);
2272                         kibnal_put_conn (conn);
2273
2274                         /* start again now I've dropped the lock */
2275                         goto again;
2276                 }
2277         }
2278
2279         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2280 }
2281
2282 void
2283 kibnal_terminate_conn (kib_conn_t *conn)
2284 {
2285         int           rc;
2286
2287         CDEBUG(D_NET, "conn %p\n", conn);
2288         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2289         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2290
2291         rc = ib_cm_disconnect (conn->ibc_comm_id);
2292         if (rc != 0)
2293                 CERROR ("Error %d disconnecting conn %p -> "LPX64"\n",
2294                         rc, conn, conn->ibc_peer->ibp_nid);
2295 }
2296
2297 int
2298 kibnal_reaper (void *arg)
2299 {
2300         wait_queue_t       wait;
2301         unsigned long      flags;
2302         kib_conn_t        *conn;
2303         int                timeout;
2304         int                i;
2305         int                peer_index = 0;
2306         unsigned long      deadline = jiffies;
2307         
2308         kportal_daemonize ("kibnal_reaper");
2309         kportal_blockallsigs ();
2310
2311         init_waitqueue_entry (&wait, current);
2312
2313         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2314
2315         while (!kibnal_data.kib_shutdown) {
2316                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2317                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2318                                            kib_conn_t, ibc_list);
2319                         list_del (&conn->ibc_list);
2320                         
2321                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2322
2323                         switch (conn->ibc_state) {
2324                         case IBNAL_CONN_DEATHROW:
2325                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2326                                 /* Disconnect: conn becomes a zombie in the
2327                                  * callback and last ref reschedules it
2328                                  * here... */
2329                                 kibnal_terminate_conn(conn);
2330                                 kibnal_put_conn (conn);
2331                                 break;
2332                                 
2333                         case IBNAL_CONN_ZOMBIE:
2334                                 kibnal_destroy_conn (conn);
2335                                 break;
2336                                 
2337                         default:
2338                                 CERROR ("Bad conn %p state: %d\n",
2339                                         conn, conn->ibc_state);
2340                                 LBUG();
2341                         }
2342
2343                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2344                         continue;
2345                 }
2346
2347                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2348
2349                 /* careful with the jiffy wrap... */
2350                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2351                         const int n = 4;
2352                         const int p = 1;
2353                         int       chunk = kibnal_data.kib_peer_hash_size;
2354                         
2355                         /* Time to check for RDMA timeouts on a few more
2356                          * peers: I do checks every 'p' seconds on a
2357                          * proportion of the peer table and I need to check
2358                          * every connection 'n' times within a timeout
2359                          * interval, to ensure I detect a timeout on any
2360                          * connection within (n+1)/n times the timeout
2361                          * interval. */
2362
2363                         if (kibnal_tunables.kib_io_timeout > n * p)
2364                                 chunk = (chunk * n * p) / 
2365                                         kibnal_tunables.kib_io_timeout;
2366                         if (chunk == 0)
2367                                 chunk = 1;
2368
2369                         for (i = 0; i < chunk; i++) {
2370                                 kibnal_check_conns (peer_index);
2371                                 peer_index = (peer_index + 1) % 
2372                                              kibnal_data.kib_peer_hash_size;
2373                         }
2374
2375                         deadline += p * HZ;
2376                 }
2377
2378                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2379
2380                 set_current_state (TASK_INTERRUPTIBLE);
2381                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2382
2383                 schedule_timeout (timeout);
2384
2385                 set_current_state (TASK_RUNNING);
2386                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2387
2388                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2389         }
2390
2391         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2392
2393         kibnal_thread_fini ();
2394         return (0);
2395 }
2396
2397 int
2398 kibnal_connd (void *arg)
2399 {
2400         long               id = (long)arg;
2401         char               name[16];
2402         wait_queue_t       wait;
2403         unsigned long      flags;
2404         kib_peer_t        *peer;
2405         kib_acceptsock_t  *as;
2406         int                did_something;
2407
2408         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2409         kportal_daemonize(name);
2410         kportal_blockallsigs();
2411
2412         init_waitqueue_entry (&wait, current);
2413
2414         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2415
2416         while (!kibnal_data.kib_shutdown) {
2417                 did_something = 0;
2418
2419                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2420                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2421                                          kib_acceptsock_t, ibas_list);
2422                         list_del (&as->ibas_list);
2423                         
2424                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2425
2426                         kibnal_handle_svcqry(as->ibas_sock);
2427                         sock_release(as->ibas_sock);
2428                         PORTAL_FREE(as, sizeof(*as));
2429                         
2430                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2431                         did_something = 1;
2432                 }
2433                         
2434                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2435                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2436                                            kib_peer_t, ibp_connd_list);
2437                         
2438                         list_del_init (&peer->ibp_connd_list);
2439                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2440
2441                         kibnal_connect_peer (peer);
2442                         kibnal_put_peer (peer);
2443
2444                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2445                         did_something = 1;
2446                 }
2447
2448                 if (did_something)
2449                         continue;
2450
2451                 set_current_state (TASK_INTERRUPTIBLE);
2452                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2453
2454                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2455
2456                 schedule();
2457
2458                 set_current_state (TASK_RUNNING);
2459                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2460
2461                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2462         }
2463
2464         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2465
2466         kibnal_thread_fini ();
2467         return (0);
2468 }
2469
2470 int
2471 kibnal_scheduler(void *arg)
2472 {
2473         long            id = (long)arg;
2474         char            name[16];
2475         kib_rx_t       *rx;
2476         kib_tx_t       *tx;
2477         unsigned long   flags;
2478         int             rc;
2479         int             counter = 0;
2480         int             did_something;
2481
2482         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2483         kportal_daemonize(name);
2484         kportal_blockallsigs();
2485
2486         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2487
2488         while (!kibnal_data.kib_shutdown) {
2489                 did_something = 0;
2490
2491                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2492                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2493                                         kib_tx_t, tx_list);
2494                         list_del(&tx->tx_list);
2495                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2496                                                flags);
2497                         kibnal_tx_done(tx);
2498
2499                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2500                                           flags);
2501                 }
2502
2503                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2504                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2505                                         kib_rx_t, rx_list);
2506                         list_del(&rx->rx_list);
2507                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2508                                                flags);
2509
2510                         kibnal_rx(rx);
2511
2512                         did_something = 1;
2513                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2514                                           flags);
2515                 }
2516
2517                 /* nothing to do or hogging CPU */
2518                 if (!did_something || counter++ == IBNAL_RESCHED) {
2519                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2520                                                flags);
2521                         counter = 0;
2522
2523                         if (!did_something) {
2524                                 rc = wait_event_interruptible(
2525                                         kibnal_data.kib_sched_waitq,
2526                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2527                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2528                                         kibnal_data.kib_shutdown);
2529                         } else {
2530                                 our_cond_resched();
2531                         }
2532
2533                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2534                                           flags);
2535                 }
2536         }
2537
2538         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2539
2540         kibnal_thread_fini();
2541         return (0);
2542 }
2543
2544
2545 lib_nal_t kibnal_lib = {
2546         libnal_data:        &kibnal_data,      /* NAL private data */
2547         libnal_send:         kibnal_send,
2548         libnal_send_pages:   kibnal_send_pages,
2549         libnal_recv:         kibnal_recv,
2550         libnal_recv_pages:   kibnal_recv_pages,
2551         libnal_dist:         kibnal_dist
2552 };