Whamcloud - gitweb
* 5630 fix takes ibnal global lock at raised IRQ priority
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openibnal.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
47         unsigned long    flags;
48         int              i;
49         int              rc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         switch (tx->tx_mapped) {
55         default:
56                 LBUG();
57
58         case KIB_TX_UNMAPPED:
59                 break;
60                 
61         case KIB_TX_MAPPED:
62                 if (in_interrupt()) {
63                         /* can't deregister memory in IRQ context... */
64                         kibnal_schedule_tx_done(tx);
65                         return;
66                 }
67                 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
68                 LASSERT (rc == 0);
69                 tx->tx_mapped = KIB_TX_UNMAPPED;
70                 break;
71
72 #if IBNAL_FMR
73         case KIB_TX_MAPPED_FMR:
74                 if (in_interrupt() && tx->tx_status != 0) {
75                         /* can't flush FMRs in IRQ context... */
76                         kibnal_schedule_tx_done(tx);
77                         return;
78                 }              
79
80                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
81                 LASSERT (rc == 0);
82
83                 if (tx->tx_status != 0)
84                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
85                 tx->tx_mapped = KIB_TX_UNMAPPED;
86                 break;
87 #endif
88         }
89
90         for (i = 0; i < 2; i++) {
91                 /* tx may have up to 2 libmsgs to finalise */
92                 if (tx->tx_libmsg[i] == NULL)
93                         continue;
94
95                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96                 tx->tx_libmsg[i] = NULL;
97         }
98         
99         if (tx->tx_conn != NULL) {
100                 kibnal_put_conn (tx->tx_conn);
101                 tx->tx_conn = NULL;
102         }
103
104         tx->tx_nsp = 0;
105         tx->tx_passive_rdma = 0;
106         tx->tx_status = 0;
107
108         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
109
110         if (tx->tx_isnblk) {
111                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
112         } else {
113                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
114                 wake_up (&kibnal_data.kib_idle_tx_waitq);
115         }
116
117         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
118 }
119
120 kib_tx_t *
121 kibnal_get_idle_tx (int may_block) 
122 {
123         unsigned long  flags;
124         kib_tx_t      *tx = NULL;
125         
126         for (;;) {
127                 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
128
129                 /* "normal" descriptor is free */
130                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
131                         tx = list_entry (kibnal_data.kib_idle_txs.next,
132                                          kib_tx_t, tx_list);
133                         break;
134                 }
135
136                 if (!may_block) {
137                         /* may dip into reserve pool */
138                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
139                                 CERROR ("reserved tx desc pool exhausted\n");
140                                 break;
141                         }
142
143                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
144                                          kib_tx_t, tx_list);
145                         break;
146                 }
147
148                 /* block for idle tx */
149                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
150
151                 wait_event (kibnal_data.kib_idle_tx_waitq,
152                             !list_empty (&kibnal_data.kib_idle_txs) ||
153                             kibnal_data.kib_shutdown);
154         }
155
156         if (tx != NULL) {
157                 list_del (&tx->tx_list);
158
159                 /* Allocate a new passive RDMA completion cookie.  It might
160                  * not be needed, but we've got a lock right now and we're
161                  * unlikely to wrap... */
162                 tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
163
164                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
165                 LASSERT (tx->tx_nsp == 0);
166                 LASSERT (tx->tx_sending == 0);
167                 LASSERT (tx->tx_status == 0);
168                 LASSERT (tx->tx_conn == NULL);
169                 LASSERT (!tx->tx_passive_rdma);
170                 LASSERT (!tx->tx_passive_rdma_wait);
171                 LASSERT (tx->tx_libmsg[0] == NULL);
172                 LASSERT (tx->tx_libmsg[1] == NULL);
173         }
174
175         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
176         
177         return (tx);
178 }
179
180 int
181 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
182 {
183         /* I would guess that if kibnal_get_peer (nid) == NULL,
184            and we're not routing, then 'nid' is very distant :) */
185         if ( nal->libnal_ni.ni_pid.nid == nid ) {
186                 *dist = 0;
187         } else {
188                 *dist = 1;
189         }
190
191         return 0;
192 }
193
194 void
195 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
196 {
197         struct list_head *ttmp;
198         unsigned long     flags;
199         int               idle;
200
201         spin_lock_irqsave (&conn->ibc_lock, flags);
202
203         list_for_each (ttmp, &conn->ibc_active_txs) {
204                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
205
206                 LASSERT (tx->tx_passive_rdma ||
207                          !tx->tx_passive_rdma_wait);
208
209                 LASSERT (tx->tx_passive_rdma_wait ||
210                          tx->tx_sending != 0);
211
212                 if (!tx->tx_passive_rdma_wait ||
213                     tx->tx_passive_rdma_cookie != cookie)
214                         continue;
215
216                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
217
218                 tx->tx_status = status;
219                 tx->tx_passive_rdma_wait = 0;
220                 idle = (tx->tx_sending == 0);
221
222                 if (idle)
223                         list_del (&tx->tx_list);
224
225                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
226
227                 /* I could be racing with tx callbacks.  It's whoever
228                  * _makes_ tx idle that frees it */
229                 if (idle)
230                         kibnal_tx_done (tx);
231                 return;
232         }
233                 
234         spin_unlock_irqrestore (&conn->ibc_lock, flags);
235
236         CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
237                 cookie, conn->ibc_peer->ibp_nid);
238 }
239
240 void
241 kibnal_post_rx (kib_rx_t *rx, int do_credits)
242 {
243         kib_conn_t   *conn = rx->rx_conn;
244         int           rc;
245         unsigned long flags;
246
247         rx->rx_gl = (struct ib_gather_scatter) {
248                 .address = rx->rx_vaddr,
249                 .length  = IBNAL_MSG_SIZE,
250                 .key     = conn->ibc_rx_pages->ibp_lkey,
251         };
252
253         rx->rx_sp = (struct ib_receive_param) {
254                 .work_request_id        = kibnal_ptr2wreqid(rx, 1),
255                 .scatter_list           = &rx->rx_gl,
256                 .num_scatter_entries    = 1,
257                 .device_specific        = NULL,
258                 .signaled               = 1,
259         };
260
261         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
262         LASSERT (!rx->rx_posted);
263         rx->rx_posted = 1;
264         mb();
265
266         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
267                 rc = -ECONNABORTED;
268         else
269                 rc = ib_receive (conn->ibc_qp, &rx->rx_sp, 1);
270
271         if (rc == 0) {
272                 if (do_credits) {
273                         spin_lock_irqsave(&conn->ibc_lock, flags);
274                         conn->ibc_outstanding_credits++;
275                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
276
277                         kibnal_check_sends(conn);
278                 }
279                 return;
280         }
281
282         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
283                 CERROR ("Error posting receive -> "LPX64": %d\n",
284                         conn->ibc_peer->ibp_nid, rc);
285                 kibnal_close_conn (rx->rx_conn, rc);
286         } else {
287                 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
288                         conn->ibc_peer->ibp_nid, rc);
289         }
290
291         /* Drop rx's ref */
292         kibnal_put_conn (conn);
293 }
294
295 void
296 kibnal_rx_callback (struct ib_cq_entry *e)
297 {
298         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
299         kib_msg_t    *msg = rx->rx_msg;
300         kib_conn_t   *conn = rx->rx_conn;
301         int           credits;
302         unsigned long flags;
303         int           rc;
304
305         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
306         LASSERT (rx->rx_posted);
307         rx->rx_posted = 0;
308         mb();
309
310         /* receives complete with error in any case after we've started
311          * closing the QP */
312         if (conn->ibc_state >= IBNAL_CONN_DEATHROW)
313                 goto failed;
314
315         /* We don't post receives until the conn is established */
316         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
317
318         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
319                 CERROR("Rx from "LPX64" failed: %d\n", 
320                        conn->ibc_peer->ibp_nid, e->status);
321                 goto failed;
322         }
323
324         rc = kibnal_unpack_msg(msg, e->bytes_transferred);
325         if (rc != 0) {
326                 CERROR ("Error %d unpacking rx from "LPX64"\n",
327                         rc, conn->ibc_peer->ibp_nid);
328                 goto failed;
329         }
330
331         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
332             msg->ibm_srcstamp != conn->ibc_incarnation ||
333             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
334             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
335                 CERROR ("Stale rx from "LPX64"\n",
336                         conn->ibc_peer->ibp_nid);
337                 goto failed;
338         }
339
340         /* Have I received credits that will let me send? */
341         credits = msg->ibm_credits;
342         if (credits != 0) {
343                 spin_lock_irqsave(&conn->ibc_lock, flags);
344                 conn->ibc_credits += credits;
345                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
346                 
347                 kibnal_check_sends(conn);
348         }
349
350         switch (msg->ibm_type) {
351         case IBNAL_MSG_NOOP:
352                 kibnal_post_rx (rx, 1);
353                 return;
354
355         case IBNAL_MSG_IMMEDIATE:
356                 break;
357                 
358         case IBNAL_MSG_PUT_RDMA:
359         case IBNAL_MSG_GET_RDMA:
360                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
361                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
362                        msg->ibm_u.rdma.ibrm_desc.rd_key,
363                        msg->ibm_u.rdma.ibrm_desc.rd_addr,
364                        msg->ibm_u.rdma.ibrm_desc.rd_nob);
365                 break;
366                 
367         case IBNAL_MSG_PUT_DONE:
368         case IBNAL_MSG_GET_DONE:
369                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
370                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
371                        msg->ibm_u.completion.ibcm_status);
372
373                 kibnal_complete_passive_rdma (conn, 
374                                               msg->ibm_u.completion.ibcm_cookie,
375                                               msg->ibm_u.completion.ibcm_status);
376                 kibnal_post_rx (rx, 1);
377                 return;
378                         
379         default:
380                 CERROR ("Bad msg type %x from "LPX64"\n",
381                         msg->ibm_type, conn->ibc_peer->ibp_nid);
382                 goto failed;
383         }
384
385         /* schedule for kibnal_rx() in thread context */
386         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
387         
388         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
389         wake_up (&kibnal_data.kib_sched_waitq);
390         
391         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
392         return;
393         
394  failed:
395         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
396         kibnal_close_conn(conn, -ECONNABORTED);
397
398         /* Don't re-post rx & drop its ref on conn */
399         kibnal_put_conn(conn);
400 }
401
402 void
403 kibnal_rx (kib_rx_t *rx)
404 {
405         kib_msg_t   *msg = rx->rx_msg;
406
407         /* Clear flag so I can detect if I've sent an RDMA completion */
408         rx->rx_rdma = 0;
409
410         switch (msg->ibm_type) {
411         case IBNAL_MSG_GET_RDMA:
412                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
413                 /* If the incoming get was matched, I'll have initiated the
414                  * RDMA and the completion message... */
415                 if (rx->rx_rdma)
416                         break;
417
418                 /* Otherwise, I'll send a failed completion now to prevent
419                  * the peer's GET blocking for the full timeout. */
420                 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
421                         rx->rx_conn->ibc_peer->ibp_nid);
422                 kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
423                                           rx, NULL, 0, NULL, NULL, 0, 0);
424                 break;
425                 
426         case IBNAL_MSG_PUT_RDMA:
427                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
428                 if (rx->rx_rdma)
429                         break;
430                 /* This is most unusual, since even if lib_parse() didn't
431                  * match anything, it should have asked us to read (and
432                  * discard) the payload.  The portals header must be
433                  * inconsistent with this message type, so it's the
434                  * sender's fault for sending garbage and she can time
435                  * herself out... */
436                 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
437                         rx->rx_conn->ibc_peer->ibp_nid);
438                 break;
439
440         case IBNAL_MSG_IMMEDIATE:
441                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
442                 LASSERT (!rx->rx_rdma);
443                 break;
444                 
445         default:
446                 LBUG();
447                 break;
448         }
449
450         kibnal_post_rx (rx, 1);
451 }
452
453 #if 0
454 int
455 kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
456 {
457         struct page *page;
458
459         if (vaddr >= VMALLOC_START &&
460             vaddr < VMALLOC_END)
461                 page = vmalloc_to_page ((void *)vaddr);
462 #if CONFIG_HIGHMEM
463         else if (vaddr >= PKMAP_BASE &&
464                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
465                 page = vmalloc_to_page ((void *)vaddr);
466         /* in 2.4 ^ just walks the page tables */
467 #endif
468         else
469                 page = virt_to_page (vaddr);
470
471         if (page == NULL ||
472             !VALID_PAGE (page))
473                 return (-EFAULT);
474
475         *physp = kibnal_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
476         return (0);
477 }
478 #endif
479
480 int
481 kibnal_map_iov (kib_tx_t *tx, enum ib_memory_access access,
482                  int niov, struct iovec *iov, int offset, int nob)
483                  
484 {
485         void   *vaddr;
486         int     rc;
487
488         LASSERT (nob > 0);
489         LASSERT (niov > 0);
490         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
491
492         while (offset >= iov->iov_len) {
493                 offset -= iov->iov_len;
494                 niov--;
495                 iov++;
496                 LASSERT (niov > 0);
497         }
498
499         if (nob > iov->iov_len - offset) {
500                 CERROR ("Can't map multiple vaddr fragments\n");
501                 return (-EMSGSIZE);
502         }
503
504         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
505         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
506
507         rc = ib_memory_register (kibnal_data.kib_pd,
508                                  vaddr, nob,
509                                  access,
510                                  &tx->tx_md.md_handle.mr,
511                                  &tx->tx_md.md_lkey,
512                                  &tx->tx_md.md_rkey);
513         
514         if (rc != 0) {
515                 CERROR ("Can't map vaddr: %d\n", rc);
516                 return (rc);
517         }
518
519         tx->tx_mapped = KIB_TX_MAPPED;
520         return (0);
521 }
522
523 int
524 kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access,
525                   int nkiov, ptl_kiov_t *kiov,
526                   int offset, int nob)
527 {
528 #if IBNAL_FMR
529         __u64                      *phys;
530         const int                   mapped = KIB_TX_MAPPED_FMR;
531 #else
532         struct ib_physical_buffer  *phys;
533         const int                   mapped = KIB_TX_MAPPED;
534 #endif
535         int                         page_offset;
536         int                         nphys;
537         int                         resid;
538         int                         phys_size;
539         int                         rc;
540
541         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
542
543         LASSERT (nob > 0);
544         LASSERT (nkiov > 0);
545         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
546
547         while (offset >= kiov->kiov_len) {
548                 offset -= kiov->kiov_len;
549                 nkiov--;
550                 kiov++;
551                 LASSERT (nkiov > 0);
552         }
553
554         phys_size = nkiov * sizeof (*phys);
555         PORTAL_ALLOC(phys, phys_size);
556         if (phys == NULL) {
557                 CERROR ("Can't allocate tmp phys\n");
558                 return (-ENOMEM);
559         }
560
561         page_offset = kiov->kiov_offset + offset;
562 #if IBNAL_FMR
563         phys[0] = kibnal_page2phys(kiov->kiov_page);
564 #else
565         phys[0].address = kibnal_page2phys(kiov->kiov_page);
566         phys[0].size = PAGE_SIZE;
567 #endif
568         nphys = 1;
569         resid = nob - (kiov->kiov_len - offset);
570
571         while (resid > 0) {
572                 kiov++;
573                 nkiov--;
574                 LASSERT (nkiov > 0);
575
576                 if (kiov->kiov_offset != 0 ||
577                     ((resid > PAGE_SIZE) && 
578                      kiov->kiov_len < PAGE_SIZE)) {
579                         int i;
580                         /* Can't have gaps */
581                         CERROR ("Can't make payload contiguous in I/O VM:"
582                                 "page %d, offset %d, len %d \n", nphys, 
583                                 kiov->kiov_offset, kiov->kiov_len);
584
585                         for (i = -nphys; i < nkiov; i++) 
586                         {
587                                 CERROR("kiov[%d] %p +%d for %d\n",
588                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
589                         }
590                         
591                         rc = -EINVAL;
592                         goto out;
593                 }
594
595                 if (nphys == PTL_MD_MAX_IOV) {
596                         CERROR ("payload too big (%d)\n", nphys);
597                         rc = -EMSGSIZE;
598                         goto out;
599                 }
600
601                 LASSERT (nphys * sizeof (*phys) < phys_size);
602 #if IBNAL_FMR
603                 phys[nphys] = kibnal_page2phys(kiov->kiov_page);
604 #else
605                 phys[nphys].address = kibnal_page2phys(kiov->kiov_page);
606                 phys[nphys].size = PAGE_SIZE;
607 #endif
608                 nphys++;
609
610                 resid -= PAGE_SIZE;
611         }
612
613         tx->tx_md.md_addr = IBNAL_RDMA_BASE;
614
615 #if IBNAL_FMR
616         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
617                                        phys, nphys,
618                                        &tx->tx_md.md_addr,
619                                        page_offset,
620                                        &tx->tx_md.md_handle.fmr,
621                                        &tx->tx_md.md_lkey,
622                                        &tx->tx_md.md_rkey);
623 #else
624         rc = ib_memory_register_physical (kibnal_data.kib_pd,
625                                           phys, nphys,
626                                           &tx->tx_md.md_addr,
627                                           nob, page_offset,
628                                           access,
629                                           &tx->tx_md.md_handle.mr,
630                                           &tx->tx_md.md_lkey,
631                                           &tx->tx_md.md_rkey);
632 #endif
633         if (rc == 0) {
634                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
635                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
636                 tx->tx_mapped = mapped;
637         } else {
638                 CERROR ("Can't map phys: %d\n", rc);
639                 rc = -EFAULT;
640         }
641
642  out:
643         PORTAL_FREE(phys, phys_size);
644         return (rc);
645 }
646
647 kib_conn_t *
648 kibnal_find_conn_locked (kib_peer_t *peer)
649 {
650         struct list_head *tmp;
651
652         /* just return the first connection */
653         list_for_each (tmp, &peer->ibp_conns) {
654                 return (list_entry(tmp, kib_conn_t, ibc_list));
655         }
656
657         return (NULL);
658 }
659
660 void
661 kibnal_check_sends (kib_conn_t *conn)
662 {
663         unsigned long   flags;
664         kib_tx_t       *tx;
665         int             rc;
666         int             i;
667         int             done;
668         int             nwork;
669
670         spin_lock_irqsave (&conn->ibc_lock, flags);
671
672         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
673
674         if (list_empty(&conn->ibc_tx_queue) &&
675             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
676                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
677                 
678                 tx = kibnal_get_idle_tx(0);     /* don't block */
679                 if (tx != NULL)
680                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
681
682                 spin_lock_irqsave(&conn->ibc_lock, flags);
683                 
684                 if (tx != NULL) {
685                         atomic_inc(&conn->ibc_refcount);
686                         kibnal_queue_tx_locked(tx, conn);
687                 }
688         }
689
690         while (!list_empty (&conn->ibc_tx_queue)) {
691                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
692
693                 /* We rely on this for QP sizing */
694                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
695
696                 LASSERT (conn->ibc_outstanding_credits >= 0);
697                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
698                 LASSERT (conn->ibc_credits >= 0);
699                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
700
701                 /* Not on ibc_rdma_queue */
702                 LASSERT (!tx->tx_passive_rdma_wait);
703
704                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
705                         break;
706
707                 if (conn->ibc_credits == 0)     /* no credits */
708                         break;
709                 
710                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
711                     conn->ibc_outstanding_credits == 0) /* giving back credits */
712                         break;
713
714                 list_del (&tx->tx_list);
715
716                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
717                     (!list_empty(&conn->ibc_tx_queue) ||
718                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
719                         /* redundant NOOP */
720                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
721                         kibnal_tx_done(tx);
722                         spin_lock_irqsave(&conn->ibc_lock, flags);
723                         continue;
724                 }
725
726                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
727                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
728
729                 conn->ibc_outstanding_credits = 0;
730                 conn->ibc_nsends_posted++;
731                 conn->ibc_credits--;
732
733                 tx->tx_sending = tx->tx_nsp;
734                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
735                 list_add (&tx->tx_list, &conn->ibc_active_txs);
736
737                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
738
739                 /* NB the gap between removing tx from the queue and sending it
740                  * allows message re-ordering to occur */
741
742                 LASSERT (tx->tx_nsp > 0);
743
744                 rc = -ECONNABORTED;
745                 nwork = 0;
746                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
747                         tx->tx_status = 0;
748                         /* Driver only accepts 1 item at a time */
749                         for (i = 0; i < tx->tx_nsp; i++) {
750                                 rc = ib_send (conn->ibc_qp, &tx->tx_sp[i], 1);
751                                 if (rc != 0)
752                                         break;
753                                 nwork++;
754                         }
755                 }
756
757                 spin_lock_irqsave (&conn->ibc_lock, flags);
758                 if (rc != 0) {
759                         /* NB credits are transferred in the actual
760                          * message, which can only be the last work item */
761                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
762                         conn->ibc_credits++;
763                         conn->ibc_nsends_posted--;
764
765                         tx->tx_status = rc;
766                         tx->tx_passive_rdma_wait = 0;
767                         tx->tx_sending -= tx->tx_nsp - nwork;
768
769                         done = (tx->tx_sending == 0);
770                         if (done)
771                                 list_del (&tx->tx_list);
772                         
773                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
774                         
775                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
776                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
777                                         rc, conn->ibc_peer->ibp_nid);
778                         else
779                                 CDEBUG (D_NET, "Error %d posting transmit to "
780                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
781
782                         kibnal_close_conn (conn, rc);
783
784                         if (done)
785                                 kibnal_tx_done (tx);
786                         return;
787                 }
788                 
789         }
790
791         spin_unlock_irqrestore (&conn->ibc_lock, flags);
792 }
793
794 void
795 kibnal_tx_callback (struct ib_cq_entry *e)
796 {
797         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(e->work_request_id);
798         kib_conn_t   *conn;
799         unsigned long flags;
800         int           idle;
801
802         conn = tx->tx_conn;
803         LASSERT (conn != NULL);
804         LASSERT (tx->tx_sending != 0);
805
806         spin_lock_irqsave(&conn->ibc_lock, flags);
807
808         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
809                tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
810                e->status);
811
812         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
813          * gets to free it, which also drops its ref on 'conn'.  If it's
814          * not me, then I take an extra ref on conn so it can't disappear
815          * under me. */
816
817         tx->tx_sending--;
818         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
819                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
820         if (idle)
821                 list_del(&tx->tx_list);
822
823         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
824                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
825                atomic_read (&conn->ibc_refcount));
826         atomic_inc (&conn->ibc_refcount);
827
828         if (tx->tx_sending == 0)
829                 conn->ibc_nsends_posted--;
830
831         if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
832             tx->tx_status == 0)
833                 tx->tx_status = -ECONNABORTED;
834                 
835         spin_unlock_irqrestore(&conn->ibc_lock, flags);
836
837         if (idle)
838                 kibnal_tx_done (tx);
839
840         if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
841                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
842                         conn->ibc_peer->ibp_nid, e->status);
843                 kibnal_close_conn (conn, -ENETDOWN);
844         } else {
845                 /* can I shovel some more sends out the door? */
846                 kibnal_check_sends(conn);
847         }
848
849         kibnal_put_conn (conn);
850 }
851
852 void
853 kibnal_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
854 {
855         if (kibnal_wreqid_is_rx(e->work_request_id))
856                 kibnal_rx_callback (e);
857         else
858                 kibnal_tx_callback (e);
859 }
860
861 void
862 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
863 {
864         struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
865         struct ib_send_param     *sp = &tx->tx_sp[tx->tx_nsp];
866         int                       fence;
867         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
868
869         LASSERT (tx->tx_nsp >= 0 && 
870                  tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
871         LASSERT (nob <= IBNAL_MSG_SIZE);
872
873         kibnal_init_msg(tx->tx_msg, type, body_nob);
874
875         /* Fence the message if it's bundled with an RDMA read */
876         fence = (tx->tx_nsp > 0) &&
877                 (type == IBNAL_MSG_PUT_DONE);
878
879         *gl = (struct ib_gather_scatter) {
880                 .address = tx->tx_vaddr,
881                 .length  = nob,
882                 .key     = kibnal_data.kib_tx_pages->ibp_lkey,
883         };
884
885         /* NB If this is an RDMA read, the completion message must wait for
886          * the RDMA to complete.  Sends wait for previous RDMA writes
887          * anyway... */
888         *sp = (struct ib_send_param) {
889                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
890                 .op                   = IB_OP_SEND,
891                 .gather_list          = gl,
892                 .num_gather_entries   = 1,
893                 .device_specific      = NULL,
894                 .solicited_event      = 1,
895                 .signaled             = 1,
896                 .immediate_data_valid = 0,
897                 .fence                = fence,
898                 .inline_data          = 0,
899         };
900
901         tx->tx_nsp++;
902 }
903
904 void
905 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
906 {
907         unsigned long         flags;
908
909         spin_lock_irqsave(&conn->ibc_lock, flags);
910
911         kibnal_queue_tx_locked (tx, conn);
912         
913         spin_unlock_irqrestore(&conn->ibc_lock, flags);
914         
915         kibnal_check_sends(conn);
916 }
917
918 void
919 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
920 {
921         unsigned long    flags;
922         kib_peer_t      *peer;
923         kib_conn_t      *conn;
924         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
925
926         /* If I get here, I've committed to send, so I complete the tx with
927          * failure on any problems */
928         
929         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
930         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
931
932         read_lock_irqsave(g_lock, flags);
933         
934         peer = kibnal_find_peer_locked (nid);
935         if (peer == NULL) {
936                 read_unlock_irqrestore(g_lock, flags);
937                 tx->tx_status = -EHOSTUNREACH;
938                 kibnal_tx_done (tx);
939                 return;
940         }
941
942         conn = kibnal_find_conn_locked (peer);
943         if (conn != NULL) {
944                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
945                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
946                        atomic_read (&conn->ibc_refcount));
947                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
948                 read_unlock_irqrestore(g_lock, flags);
949                 
950                 kibnal_queue_tx (tx, conn);
951                 return;
952         }
953         
954         /* Making one or more connections; I'll need a write lock... */
955         read_unlock(g_lock);
956         write_lock(g_lock);
957
958         peer = kibnal_find_peer_locked (nid);
959         if (peer == NULL) {
960                 write_unlock_irqrestore (g_lock, flags);
961                 tx->tx_status = -EHOSTUNREACH;
962                 kibnal_tx_done (tx);
963                 return;
964         }
965
966         conn = kibnal_find_conn_locked (peer);
967         if (conn != NULL) {
968                 /* Connection exists; queue message on it */
969                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
970                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
971                        atomic_read (&conn->ibc_refcount));
972                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
973                 write_unlock_irqrestore (g_lock, flags);
974                 
975                 kibnal_queue_tx (tx, conn);
976                 return;
977         }
978
979         if (peer->ibp_connecting == 0) {
980                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
981                         write_unlock_irqrestore (g_lock, flags);
982                         tx->tx_status = -EHOSTUNREACH;
983                         kibnal_tx_done (tx);
984                         return;
985                 }
986         
987                 peer->ibp_connecting = 1;
988                 atomic_inc (&peer->ibp_refcount); /* extra ref for connd */
989         
990                 spin_lock (&kibnal_data.kib_connd_lock);
991         
992                 list_add_tail (&peer->ibp_connd_list,
993                                &kibnal_data.kib_connd_peers);
994                 wake_up (&kibnal_data.kib_connd_waitq);
995         
996                 spin_unlock (&kibnal_data.kib_connd_lock);
997         }
998         
999         /* A connection is being established; queue the message... */
1000         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1001
1002         write_unlock_irqrestore (g_lock, flags);
1003 }
1004
1005 ptl_err_t
1006 kibnal_start_passive_rdma (int type, ptl_nid_t nid,
1007                             lib_msg_t *libmsg, ptl_hdr_t *hdr)
1008 {
1009         int         nob = libmsg->md->length;
1010         kib_tx_t   *tx;
1011         kib_msg_t  *ibmsg;
1012         int         rc;
1013         int         access;
1014         
1015         LASSERT (type == IBNAL_MSG_PUT_RDMA || 
1016                  type == IBNAL_MSG_GET_RDMA);
1017         LASSERT (nob > 0);
1018         LASSERT (!in_interrupt());              /* Mapping could block */
1019
1020         if (type == IBNAL_MSG_PUT_RDMA) {
1021                 access = IB_ACCESS_REMOTE_READ;
1022         } else {
1023                 access = IB_ACCESS_REMOTE_WRITE |
1024                          IB_ACCESS_LOCAL_WRITE;
1025         }
1026
1027         tx = kibnal_get_idle_tx (1);           /* May block; caller is an app thread */
1028         LASSERT (tx != NULL);
1029
1030         if ((libmsg->md->options & PTL_MD_KIOV) == 0) 
1031                 rc = kibnal_map_iov (tx, access,
1032                                      libmsg->md->md_niov,
1033                                      libmsg->md->md_iov.iov,
1034                                      0, nob);
1035         else
1036                 rc = kibnal_map_kiov (tx, access,
1037                                       libmsg->md->md_niov, 
1038                                       libmsg->md->md_iov.kiov,
1039                                       0, nob);
1040
1041         if (rc != 0) {
1042                 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1043                 goto failed;
1044         }
1045         
1046         if (type == IBNAL_MSG_GET_RDMA) {
1047                 /* reply gets finalized when tx completes */
1048                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, 
1049                                                         nid, libmsg);
1050                 if (tx->tx_libmsg[1] == NULL) {
1051                         CERROR ("Can't create reply for GET -> "LPX64"\n",
1052                                 nid);
1053                         rc = -ENOMEM;
1054                         goto failed;
1055                 }
1056         }
1057         
1058         tx->tx_passive_rdma = 1;
1059
1060         ibmsg = tx->tx_msg;
1061
1062         ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
1063         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1064         ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
1065         ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
1066         ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;
1067
1068         kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));
1069
1070         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1071                LPX64", nob %d\n",
1072                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1073                tx->tx_md.md_addr, nob);
1074         
1075         /* libmsg gets finalized when tx completes. */
1076         tx->tx_libmsg[0] = libmsg;
1077
1078         kibnal_launch_tx(tx, nid);
1079         return (PTL_OK);
1080
1081  failed:
1082         tx->tx_status = rc;
1083         kibnal_tx_done (tx);
1084         return (PTL_FAIL);
1085 }
1086
1087 void
1088 kibnal_start_active_rdma (int type, int status,
1089                            kib_rx_t *rx, lib_msg_t *libmsg, 
1090                            unsigned int niov,
1091                            struct iovec *iov, ptl_kiov_t *kiov,
1092                            int offset, int nob)
1093 {
1094         kib_msg_t    *rxmsg = rx->rx_msg;
1095         kib_msg_t    *txmsg;
1096         kib_tx_t     *tx;
1097         int           access;
1098         int           rdma_op;
1099         int           rc;
1100
1101         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1102                type, status, niov, offset, nob);
1103
1104         /* Called by scheduler */
1105         LASSERT (!in_interrupt ());
1106
1107         /* Either all pages or all vaddrs */
1108         LASSERT (!(kiov != NULL && iov != NULL));
1109
1110         /* No data if we're completing with failure */
1111         LASSERT (status == 0 || nob == 0);
1112
1113         LASSERT (type == IBNAL_MSG_GET_DONE ||
1114                  type == IBNAL_MSG_PUT_DONE);
1115
1116         /* Flag I'm completing the RDMA.  Even if I fail to send the
1117          * completion message, I will have tried my best so further
1118          * attempts shouldn't be tried. */
1119         LASSERT (!rx->rx_rdma);
1120         rx->rx_rdma = 1;
1121
1122         if (type == IBNAL_MSG_GET_DONE) {
1123                 access   = 0;
1124                 rdma_op  = IB_OP_RDMA_WRITE;
1125                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1126         } else {
1127                 access   = IB_ACCESS_LOCAL_WRITE;
1128                 rdma_op  = IB_OP_RDMA_READ;
1129                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1130         }
1131
1132         tx = kibnal_get_idle_tx (0);           /* Mustn't block */
1133         if (tx == NULL) {
1134                 CERROR ("tx descs exhausted on RDMA from "LPX64
1135                         " completing locally with failure\n",
1136                         rx->rx_conn->ibc_peer->ibp_nid);
1137                 lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1138                 return;
1139         }
1140         LASSERT (tx->tx_nsp == 0);
1141                         
1142         if (nob != 0) {
1143                 /* We actually need to transfer some data (the transfer
1144                  * size could get truncated to zero when the incoming
1145                  * message is matched) */
1146
1147                 if (kiov != NULL)
1148                         rc = kibnal_map_kiov (tx, access,
1149                                               niov, kiov, offset, nob);
1150                 else
1151                         rc = kibnal_map_iov (tx, access,
1152                                              niov, iov, offset, nob);
1153                 
1154                 if (rc != 0) {
1155                         CERROR ("Can't map RDMA -> "LPX64": %d\n", 
1156                                 rx->rx_conn->ibc_peer->ibp_nid, rc);
1157                         /* We'll skip the RDMA and complete with failure. */
1158                         status = rc;
1159                         nob = 0;
1160                 } else {
1161                         tx->tx_gl[0] = (struct ib_gather_scatter) {
1162                                 .address = tx->tx_md.md_addr,
1163                                 .length  = nob,
1164                                 .key     = tx->tx_md.md_lkey,
1165                         };
1166                 
1167                         tx->tx_sp[0] = (struct ib_send_param) {
1168                                 .work_request_id      = kibnal_ptr2wreqid(tx, 0),
1169                                 .op                   = rdma_op,
1170                                 .gather_list          = &tx->tx_gl[0],
1171                                 .num_gather_entries   = 1,
1172                                 .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,
1173                                 .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,
1174                                 .device_specific      = NULL,
1175                                 .solicited_event      = 0,
1176                                 .signaled             = 1,
1177                                 .immediate_data_valid = 0,
1178                                 .fence                = 0,
1179                                 .inline_data          = 0,
1180                         };
1181
1182                         tx->tx_nsp = 1;
1183                 }
1184         }
1185
1186         txmsg = tx->tx_msg;
1187
1188         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1189         txmsg->ibm_u.completion.ibcm_status = status;
1190         
1191         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1192
1193         if (status == 0 && nob != 0) {
1194                 LASSERT (tx->tx_nsp > 1);
1195                 /* RDMA: libmsg gets finalized when the tx completes.  This
1196                  * is after the completion message has been sent, which in
1197                  * turn is after the RDMA has finished. */
1198                 tx->tx_libmsg[0] = libmsg;
1199         } else {
1200                 LASSERT (tx->tx_nsp == 1);
1201                 /* No RDMA: local completion happens now! */
1202                 CDEBUG(D_NET, "No data: immediate completion\n");
1203                 lib_finalize (&kibnal_lib, NULL, libmsg,
1204                               status == 0 ? PTL_OK : PTL_FAIL);
1205         }
1206
1207         /* +1 ref for this tx... */
1208         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1209                rx->rx_conn, rx->rx_conn->ibc_state, 
1210                rx->rx_conn->ibc_peer->ibp_nid,
1211                atomic_read (&rx->rx_conn->ibc_refcount));
1212         atomic_inc (&rx->rx_conn->ibc_refcount);
1213         /* ...and queue it up */
1214         kibnal_queue_tx(tx, rx->rx_conn);
1215 }
1216
1217 ptl_err_t
1218 kibnal_sendmsg(lib_nal_t    *nal, 
1219                 void         *private,
1220                 lib_msg_t    *libmsg,
1221                 ptl_hdr_t    *hdr, 
1222                 int           type, 
1223                 ptl_nid_t     nid, 
1224                 ptl_pid_t     pid,
1225                 unsigned int  payload_niov, 
1226                 struct iovec *payload_iov, 
1227                 ptl_kiov_t   *payload_kiov,
1228                 int           payload_offset,
1229                 int           payload_nob)
1230 {
1231         kib_msg_t  *ibmsg;
1232         kib_tx_t   *tx;
1233         int         nob;
1234
1235         /* NB 'private' is different depending on what we're sending.... */
1236
1237         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
1238                payload_nob, payload_niov, nid , pid);
1239
1240         LASSERT (payload_nob == 0 || payload_niov > 0);
1241         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1242
1243         /* Thread context if we're sending payload */
1244         LASSERT (!in_interrupt() || payload_niov == 0);
1245         /* payload is either all vaddrs or all pages */
1246         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1247
1248         switch (type) {
1249         default:
1250                 LBUG();
1251                 return (PTL_FAIL);
1252                 
1253         case PTL_MSG_REPLY: {
1254                 /* reply's 'private' is the incoming receive */
1255                 kib_rx_t *rx = private;
1256
1257                 /* RDMA reply expected? */
1258                 if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
1259                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1260                                                  rx, libmsg, payload_niov, 
1261                                                  payload_iov, payload_kiov,
1262                                                  payload_offset, payload_nob);
1263                         return (PTL_OK);
1264                 }
1265                 
1266                 /* Incoming message consistent with immediate reply? */
1267                 if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
1268                         CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1269                                 nid, rx->rx_msg->ibm_type);
1270                         return (PTL_FAIL);
1271                 }
1272
1273                 /* Will it fit in a message? */
1274                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1275                 if (nob > IBNAL_MSG_SIZE) {
1276                         CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n", 
1277                                nid, payload_nob);
1278                         return (PTL_FAIL);
1279                 }
1280                 break;
1281         }
1282
1283         case PTL_MSG_GET:
1284                 /* might the REPLY message be big enough to need RDMA? */
1285                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1286                 if (nob > IBNAL_MSG_SIZE)
1287                         return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, 
1288                                                           nid, libmsg, hdr));
1289                 break;
1290
1291         case PTL_MSG_ACK:
1292                 LASSERT (payload_nob == 0);
1293                 break;
1294
1295         case PTL_MSG_PUT:
1296                 /* Is the payload big enough to need RDMA? */
1297                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1298                 if (nob > IBNAL_MSG_SIZE)
1299                         return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
1300                                                           nid, libmsg, hdr));
1301                 
1302                 break;
1303         }
1304
1305         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1306                                   type == PTL_MSG_REPLY ||
1307                                   in_interrupt()));
1308         if (tx == NULL) {
1309                 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", 
1310                         type, nid, in_interrupt() ? " (intr)" : "");
1311                 return (PTL_NO_SPACE);
1312         }
1313
1314         ibmsg = tx->tx_msg;
1315         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1316
1317         if (payload_nob > 0) {
1318                 if (payload_kiov != NULL)
1319                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1320                                           payload_niov, payload_kiov,
1321                                           payload_offset, payload_nob);
1322                 else
1323                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1324                                          payload_niov, payload_iov,
1325                                          payload_offset, payload_nob);
1326         }
1327
1328         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1329                             offsetof(kib_immediate_msg_t, 
1330                                      ibim_payload[payload_nob]));
1331
1332         /* libmsg gets finalized when tx completes */
1333         tx->tx_libmsg[0] = libmsg;
1334
1335         kibnal_launch_tx(tx, nid);
1336         return (PTL_OK);
1337 }
1338
1339 ptl_err_t
1340 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1341                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1342                unsigned int payload_niov, struct iovec *payload_iov,
1343                size_t payload_offset, size_t payload_len)
1344 {
1345         return (kibnal_sendmsg(nal, private, cookie,
1346                                hdr, type, nid, pid,
1347                                payload_niov, payload_iov, NULL,
1348                                payload_offset, payload_len));
1349 }
1350
1351 ptl_err_t
1352 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1353                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1354                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1355                      size_t payload_offset, size_t payload_len)
1356 {
1357         return (kibnal_sendmsg(nal, private, cookie,
1358                                hdr, type, nid, pid,
1359                                payload_niov, NULL, payload_kiov,
1360                                payload_offset, payload_len));
1361 }
1362
1363 ptl_err_t
1364 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1365                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1366                  int offset, int mlen, int rlen)
1367 {
1368         kib_rx_t    *rx = private;
1369         kib_msg_t   *rxmsg = rx->rx_msg;
1370         int          msg_nob;
1371         
1372         LASSERT (mlen <= rlen);
1373         LASSERT (!in_interrupt ());
1374         /* Either all pages or all vaddrs */
1375         LASSERT (!(kiov != NULL && iov != NULL));
1376
1377         switch (rxmsg->ibm_type) {
1378         default:
1379                 LBUG();
1380                 return (PTL_FAIL);
1381                 
1382         case IBNAL_MSG_IMMEDIATE:
1383                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1384                 if (msg_nob > IBNAL_MSG_SIZE) {
1385                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1386                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1387                         return (PTL_FAIL);
1388                 }
1389
1390                 if (kiov != NULL)
1391                         lib_copy_buf2kiov(niov, kiov, offset,
1392                                           rxmsg->ibm_u.immediate.ibim_payload,
1393                                           mlen);
1394                 else
1395                         lib_copy_buf2iov(niov, iov, offset,
1396                                          rxmsg->ibm_u.immediate.ibim_payload,
1397                                          mlen);
1398
1399                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1400                 return (PTL_OK);
1401
1402         case IBNAL_MSG_GET_RDMA:
1403                 /* We get called here just to discard any junk after the
1404                  * GET hdr. */
1405                 LASSERT (libmsg == NULL);
1406                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1407                 return (PTL_OK);
1408
1409         case IBNAL_MSG_PUT_RDMA:
1410                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
1411                                           rx, libmsg, 
1412                                           niov, iov, kiov, offset, mlen);
1413                 return (PTL_OK);
1414         }
1415 }
1416
1417 ptl_err_t
1418 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1419               unsigned int niov, struct iovec *iov, 
1420               size_t offset, size_t mlen, size_t rlen)
1421 {
1422         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1423                                 offset, mlen, rlen));
1424 }
1425
1426 ptl_err_t
1427 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1428                      unsigned int niov, ptl_kiov_t *kiov, 
1429                      size_t offset, size_t mlen, size_t rlen)
1430 {
1431         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1432                                 offset, mlen, rlen));
1433 }
1434
1435 int
1436 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1437 {
1438         long    pid = kernel_thread (fn, arg, 0);
1439
1440         if (pid < 0)
1441                 return ((int)pid);
1442
1443         atomic_inc (&kibnal_data.kib_nthreads);
1444         return (0);
1445 }
1446
1447 void
1448 kibnal_thread_fini (void)
1449 {
1450         atomic_dec (&kibnal_data.kib_nthreads);
1451 }
1452
1453 void
1454 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1455 {
1456         /* This just does the immmediate housekeeping, and schedules the
1457          * connection for the reaper to finish off.
1458          * Caller holds kib_global_lock exclusively in irq context */
1459         kib_peer_t   *peer = conn->ibc_peer;
1460
1461         CDEBUG (error == 0 ? D_NET : D_ERROR,
1462                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1463         
1464         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
1465                  conn->ibc_state == IBNAL_CONN_CONNECTING);
1466
1467         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1468                 /* kib_reaper_conns takes ibc_list's ref */
1469                 list_del (&conn->ibc_list);
1470         } else {
1471                 /* new ref for kib_reaper_conns */
1472                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1473                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1474                        atomic_read (&conn->ibc_refcount));
1475                 atomic_inc (&conn->ibc_refcount);
1476         }
1477         
1478         if (list_empty (&peer->ibp_conns) &&
1479             peer->ibp_persistence == 0) {
1480                 /* Non-persistent peer with no more conns... */
1481                 kibnal_unlink_peer_locked (peer);
1482         }
1483
1484         conn->ibc_state = IBNAL_CONN_DEATHROW;
1485
1486         /* Schedule conn for closing/destruction */
1487         spin_lock (&kibnal_data.kib_reaper_lock);
1488
1489         list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1490         wake_up (&kibnal_data.kib_reaper_waitq);
1491                 
1492         spin_unlock (&kibnal_data.kib_reaper_lock);
1493 }
1494
1495 int
1496 kibnal_close_conn (kib_conn_t *conn, int why)
1497 {
1498         unsigned long     flags;
1499         int               count = 0;
1500
1501         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1502
1503         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
1504         
1505         if (conn->ibc_state <= IBNAL_CONN_ESTABLISHED) {
1506                 count = 1;
1507                 kibnal_close_conn_locked (conn, why);
1508         }
1509         
1510         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1511         return (count);
1512 }
1513
1514 void
1515 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
1516 {
1517         LIST_HEAD        (zombies);
1518         kib_tx_t         *tx;
1519         unsigned long     flags;
1520
1521         LASSERT (rc != 0);
1522         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1523
1524         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1525
1526         LASSERT (peer->ibp_connecting != 0);
1527         peer->ibp_connecting--;
1528
1529         if (peer->ibp_connecting != 0) {
1530                 /* another connection attempt under way (loopback?)... */
1531                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1532                 return;
1533         }
1534
1535         if (list_empty(&peer->ibp_conns)) {
1536                 /* Say when active connection can be re-attempted */
1537                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1538                 /* Increase reconnection interval */
1539                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1540                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1541         
1542                 /* Take peer's blocked blocked transmits; I'll complete
1543                  * them with error */
1544                 while (!list_empty (&peer->ibp_tx_queue)) {
1545                         tx = list_entry (peer->ibp_tx_queue.next,
1546                                          kib_tx_t, tx_list);
1547                         
1548                         list_del (&tx->tx_list);
1549                         list_add_tail (&tx->tx_list, &zombies);
1550                 }
1551                 
1552                 if (kibnal_peer_active(peer) &&
1553                     (peer->ibp_persistence == 0)) {
1554                         /* failed connection attempt on non-persistent peer */
1555                         kibnal_unlink_peer_locked (peer);
1556                 }
1557         } else {
1558                 /* Can't have blocked transmits if there are connections */
1559                 LASSERT (list_empty(&peer->ibp_tx_queue));
1560         }
1561         
1562         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1563
1564         if (!list_empty (&zombies))
1565                 CERROR ("Deleting messages for "LPX64": connection failed\n",
1566                         peer->ibp_nid);
1567
1568         while (!list_empty (&zombies)) {
1569                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1570
1571                 list_del (&tx->tx_list);
1572                 /* complete now */
1573                 tx->tx_status = -EHOSTUNREACH;
1574                 kibnal_tx_done (tx);
1575         }
1576 }
1577
1578 void
1579 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1580 {
1581         int               state = conn->ibc_state;
1582         kib_peer_t       *peer = conn->ibc_peer;
1583         kib_tx_t         *tx;
1584         unsigned long     flags;
1585         int               rc;
1586         int               i;
1587
1588         /* passive connection has no connreq & vice versa */
1589         LASSERT (!active == !(conn->ibc_connreq != NULL));
1590         if (active) {
1591                 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1592                 conn->ibc_connreq = NULL;
1593         }
1594
1595         if (state == IBNAL_CONN_CONNECTING) {
1596                 /* Install common (active/passive) callback for
1597                  * disconnect/idle notification if I got as far as getting
1598                  * a CM comm_id */
1599                 rc = tsIbCmCallbackModify(conn->ibc_comm_id, 
1600                                           kibnal_conn_callback, conn);
1601                 LASSERT (rc == 0);
1602         }
1603         
1604         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1605
1606         LASSERT (peer->ibp_connecting != 0);
1607         
1608         if (status == 0) {                         
1609                 /* connection established... */
1610                 LASSERT (state == IBNAL_CONN_CONNECTING);
1611                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1612
1613                 if (!kibnal_peer_active(peer)) {
1614                         /* ...but peer deleted meantime */
1615                         status = -ECONNABORTED;
1616                 }
1617         } else {
1618                 LASSERT (state == IBNAL_CONN_INIT_QP ||
1619                          state == IBNAL_CONN_CONNECTING);
1620         }
1621
1622         if (status == 0) {
1623                 /* Everything worked! */
1624
1625                 peer->ibp_connecting--;
1626
1627                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1628                  * the IB_CM_IDLE callback */
1629                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1630                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1631                        atomic_read (&conn->ibc_refcount));
1632                 atomic_inc (&conn->ibc_refcount);
1633                 list_add (&conn->ibc_list, &peer->ibp_conns);
1634                 
1635                 /* reset reconnect interval for next attempt */
1636                 peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1637
1638                 /* post blocked sends to the new connection */
1639                 spin_lock (&conn->ibc_lock);
1640                 
1641                 while (!list_empty (&peer->ibp_tx_queue)) {
1642                         tx = list_entry (peer->ibp_tx_queue.next, 
1643                                          kib_tx_t, tx_list);
1644                         
1645                         list_del (&tx->tx_list);
1646
1647                         /* +1 ref for each tx */
1648                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1649                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1650                                atomic_read (&conn->ibc_refcount));
1651                         atomic_inc (&conn->ibc_refcount);
1652                         kibnal_queue_tx_locked (tx, conn);
1653                 }
1654                 
1655                 spin_unlock (&conn->ibc_lock);
1656
1657                 /* Nuke any dangling conns from a different peer instance... */
1658                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1659                                                  conn->ibc_incarnation);
1660
1661                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1662
1663                 /* queue up all the receives */
1664                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1665                         /* +1 ref for rx desc */
1666                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1667                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1668                                atomic_read (&conn->ibc_refcount));
1669                         atomic_inc (&conn->ibc_refcount);
1670
1671                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1672                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1673                                conn->ibc_rxs[i].rx_vaddr);
1674
1675                         kibnal_post_rx (&conn->ibc_rxs[i], 0);
1676                 }
1677
1678                 kibnal_check_sends (conn);
1679                 return;
1680         }
1681
1682         /* connection failed */
1683         if (state == IBNAL_CONN_CONNECTING) {
1684                 /* schedule for reaper to close */
1685                 kibnal_close_conn_locked (conn, status);
1686         } else {
1687                 /* Don't have a CM comm_id; just wait for refs to drain */
1688                 conn->ibc_state = IBNAL_CONN_ZOMBIE;
1689         } 
1690
1691         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1692
1693         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1694
1695         if (state != IBNAL_CONN_CONNECTING) {
1696                 /* drop caller's ref if we're not waiting for the
1697                  * IB_CM_IDLE callback */
1698                 kibnal_put_conn (conn);
1699         }
1700 }
1701
1702 int
1703 kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1704                kib_msg_t *msg, int nob)
1705 {
1706         kib_conn_t    *conn;
1707         kib_peer_t    *peer;
1708         kib_peer_t    *peer2;
1709         unsigned long  flags;
1710         int            rc;
1711
1712         rc = kibnal_unpack_msg(msg, nob);
1713         if (rc != 0) {
1714                 CERROR("Can't unpack connreq msg: %d\n", rc);
1715                 return -EPROTO;
1716         }
1717
1718         CDEBUG(D_NET, "connreq from "LPX64"\n", msg->ibm_srcnid);
1719
1720         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
1721                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
1722                        msg->ibm_type, msg->ibm_srcnid);
1723                 return -EPROTO;
1724         }
1725                 
1726         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1727                 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
1728                        msg->ibm_srcnid, msg->ibm_u.connparams.ibcp_queue_depth, 
1729                        IBNAL_MSG_QUEUE_SIZE);
1730                 return (-EPROTO);
1731         }
1732         
1733         conn = kibnal_create_conn();
1734         if (conn == NULL)
1735                 return (-ENOMEM);
1736
1737         /* assume 'nid' is a new peer */
1738         peer = kibnal_create_peer (msg->ibm_srcnid);
1739         if (peer == NULL) {
1740                 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
1741                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1742                        atomic_read (&conn->ibc_refcount));
1743                 atomic_dec (&conn->ibc_refcount);
1744                 kibnal_destroy_conn(conn);
1745                 return (-ENOMEM);
1746         }
1747         
1748         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1749
1750         /* Check I'm the same instance that gave the connection parameters.  
1751          * NB If my incarnation changes after this, the peer will get nuked and
1752          * we'll spot that when the connection is finally added into the peer's
1753          * connlist */
1754         if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
1755             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
1756                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1757                 
1758                 CERROR("Stale connection params from "LPX64"\n",
1759                        msg->ibm_srcnid);
1760                 atomic_dec(&conn->ibc_refcount);
1761                 kibnal_destroy_conn(conn);
1762                 kibnal_put_peer(peer);
1763                 return -ESTALE;
1764         }
1765
1766         peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
1767         if (peer2 == NULL) {
1768                 /* peer table takes my ref on peer */
1769                 list_add_tail (&peer->ibp_list,
1770                                kibnal_nid2peerlist(msg->ibm_srcnid));
1771         } else {
1772                 kibnal_put_peer (peer);
1773                 peer = peer2;
1774         }
1775
1776         /* +1 ref for conn */
1777         atomic_inc (&peer->ibp_refcount);
1778         peer->ibp_connecting++;
1779
1780         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1781
1782         conn->ibc_peer = peer;
1783         conn->ibc_state = IBNAL_CONN_CONNECTING;
1784         conn->ibc_comm_id = cid;
1785         conn->ibc_incarnation = msg->ibm_srcstamp;
1786         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
1787
1788         *connp = conn;
1789         return (0);
1790 }
1791
1792 tTS_IB_CM_CALLBACK_RETURN
1793 kibnal_idle_conn_callback (tTS_IB_CM_EVENT event,
1794                             tTS_IB_CM_COMM_ID cid,
1795                             void *param,
1796                             void *arg)
1797 {
1798         /* Shouldn't ever get a callback after TS_IB_CM_IDLE */
1799         CERROR ("Unexpected event %d: conn %p\n", event, arg);
1800         LBUG ();
1801         return TS_IB_CM_CALLBACK_PROCEED;
1802 }
1803
1804 tTS_IB_CM_CALLBACK_RETURN
1805 kibnal_conn_callback (tTS_IB_CM_EVENT event,
1806                        tTS_IB_CM_COMM_ID cid,
1807                        void *param,
1808                        void *arg)
1809 {
1810         kib_conn_t       *conn = arg;
1811         LIST_HEAD        (zombies); 
1812         struct list_head *tmp;
1813         struct list_head *nxt;
1814         kib_tx_t         *tx;
1815         unsigned long     flags;
1816         int               done;
1817         int               rc;
1818
1819         /* Established Connection Notifier */
1820
1821         switch (event) {
1822         default:
1823                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
1824                        conn, conn->ibc_peer->ibp_nid, event);
1825                 kibnal_close_conn (conn, -ECONNABORTED);
1826                 break;
1827                 
1828         case TS_IB_CM_DISCONNECTED:
1829                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" DISCONNECTED.\n",
1830                        conn, conn->ibc_peer->ibp_nid);
1831                 kibnal_close_conn (conn, 0);
1832                 break;
1833
1834         case TS_IB_CM_IDLE:
1835                 CDEBUG(D_NET, "Connection %p -> "LPX64" IDLE.\n",
1836                        conn, conn->ibc_peer->ibp_nid);
1837                 kibnal_put_conn (conn);        /* Lose CM's ref */
1838
1839                 /* LASSERT (no further callbacks) */
1840                 rc = tsIbCmCallbackModify(cid, 
1841                                           kibnal_idle_conn_callback, conn);
1842                 LASSERT (rc == 0);
1843
1844                 /* NB we wait until the connection has closed before
1845                  * completing outstanding passive RDMAs so we can be sure
1846                  * the network can't touch the mapped memory any more. */
1847
1848                 spin_lock_irqsave (&conn->ibc_lock, flags);
1849
1850                 /* grab passive RDMAs not waiting for the tx callback */
1851                 list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1852                         tx = list_entry (tmp, kib_tx_t, tx_list);
1853
1854                         LASSERT (tx->tx_passive_rdma ||
1855                                  !tx->tx_passive_rdma_wait);
1856
1857                         LASSERT (tx->tx_passive_rdma_wait ||
1858                                  tx->tx_sending != 0);
1859
1860                         /* still waiting for tx callback? */
1861                         if (!tx->tx_passive_rdma_wait)
1862                                 continue;
1863
1864                         tx->tx_status = -ECONNABORTED;
1865                         tx->tx_passive_rdma_wait = 0;
1866                         done = (tx->tx_sending == 0);
1867
1868                         if (!done)
1869                                 continue;
1870
1871                         list_del (&tx->tx_list);
1872                         list_add (&tx->tx_list, &zombies);
1873                 }
1874
1875                 /* grab all blocked transmits */
1876                 list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1877                         tx = list_entry (tmp, kib_tx_t, tx_list);
1878                         
1879                         list_del (&tx->tx_list);
1880                         list_add (&tx->tx_list, &zombies);
1881                 }
1882                 
1883                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
1884
1885                 while (!list_empty(&zombies)) {
1886                         tx = list_entry (zombies.next, kib_tx_t, tx_list);
1887
1888                         list_del(&tx->tx_list);
1889                         kibnal_tx_done (tx);
1890                 }
1891                 break;
1892         }
1893
1894         return TS_IB_CM_CALLBACK_PROCEED;
1895 }
1896
1897 tTS_IB_CM_CALLBACK_RETURN
1898 kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1899                                tTS_IB_CM_COMM_ID cid,
1900                                void *param,
1901                                void *arg)
1902 {
1903         kib_conn_t  *conn = arg;
1904         int          rc;
1905         
1906         switch (event) {
1907         default:
1908                 if (conn == NULL) {
1909                         /* no connection yet */
1910                         CERROR ("Unexpected event: %d\n", event);
1911                         return TS_IB_CM_CALLBACK_ABORT;
1912                 }
1913                 
1914                 CERROR ("Unexpected event %p -> "LPX64": %d\n", 
1915                         conn, conn->ibc_peer->ibp_nid, event);
1916                 kibnal_connreq_done (conn, 0, -ECONNABORTED);
1917                 break;
1918                 
1919         case TS_IB_CM_REQ_RECEIVED: {
1920                 struct ib_cm_req_received_param *req = param;
1921                 kib_msg_t                       *msg = req->remote_private_data;
1922
1923                 LASSERT (conn == NULL);
1924
1925                 /* Don't really know srcnid until successful unpack */
1926                 CDEBUG(D_NET, "REQ from ?"LPX64"?\n", msg->ibm_srcnid);
1927
1928                 rc = kibnal_accept(&conn, cid, msg, 
1929                                    req->remote_private_data_len);
1930                 if (rc != 0) {
1931                         CERROR ("Can't accept ?"LPX64"?: %d\n",
1932                                 msg->ibm_srcnid, rc);
1933                         return TS_IB_CM_CALLBACK_ABORT;
1934                 }
1935
1936                 /* update 'arg' for next callback */
1937                 rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
1938                 LASSERT (rc == 0);
1939
1940                 msg = req->accept_param.reply_private_data;
1941                 kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
1942                                 sizeof(msg->ibm_u.connparams));
1943
1944                 msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
1945
1946                 kibnal_pack_msg(msg, 0, 
1947                                 conn->ibc_peer->ibp_nid, 
1948                                 conn->ibc_incarnation);
1949
1950                 req->accept_param.qp                     = conn->ibc_qp;
1951                 req->accept_param.reply_private_data_len = msg->ibm_nob;
1952                 req->accept_param.responder_resources    = IBNAL_RESPONDER_RESOURCES;
1953                 req->accept_param.initiator_depth        = IBNAL_RESPONDER_RESOURCES;
1954                 req->accept_param.rnr_retry_count        = IBNAL_RNR_RETRY;
1955                 req->accept_param.flow_control           = IBNAL_FLOW_CONTROL;
1956
1957                 CDEBUG(D_NET, "Proceeding\n");
1958                 break;
1959         }
1960
1961         case TS_IB_CM_ESTABLISHED:
1962                 LASSERT (conn != NULL);
1963                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
1964                        conn, conn->ibc_peer->ibp_nid);
1965
1966                 kibnal_connreq_done (conn, 0, 0);
1967                 break;
1968         }
1969
1970         /* NB if the connreq is done, we switch to kibnal_conn_callback */
1971         return TS_IB_CM_CALLBACK_PROCEED;
1972 }
1973
1974 tTS_IB_CM_CALLBACK_RETURN
1975 kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
1976                               tTS_IB_CM_COMM_ID cid,
1977                               void *param,
1978                               void *arg)
1979 {
1980         kib_conn_t *conn = arg;
1981
1982         switch (event) {
1983         case TS_IB_CM_REP_RECEIVED: {
1984                 struct ib_cm_rep_received_param *rep = param;
1985                 kib_msg_t                       *msg = rep->remote_private_data;
1986                 int                              nob = rep->remote_private_data_len;
1987                 int                              rc;
1988
1989                 rc = kibnal_unpack_msg(msg, nob);
1990                 if (rc != 0) {
1991                         CERROR ("Error %d unpacking conn ack from "LPX64"\n",
1992                                 rc, conn->ibc_peer->ibp_nid);
1993                         kibnal_connreq_done (conn, 1, rc);
1994                         break;
1995                 }
1996
1997                 if (msg->ibm_type != IBNAL_MSG_CONNACK) {
1998                         CERROR ("Unexpected conn ack type %d from "LPX64"\n",
1999                                 msg->ibm_type, conn->ibc_peer->ibp_nid);
2000                         kibnal_connreq_done (conn, 1, -EPROTO);
2001                         break;
2002                 }
2003
2004                 if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
2005                     msg->ibm_srcstamp != conn->ibc_incarnation ||
2006                     msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2007                     msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2008                         CERROR("Stale conn ack from "LPX64"\n",
2009                                conn->ibc_peer->ibp_nid);
2010                         kibnal_connreq_done (conn, 1, -ESTALE);
2011                         break;
2012                 }
2013
2014                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2015                         CERROR ("Bad queue depth %d from "LPX64"\n",
2016                                 msg->ibm_u.connparams.ibcp_queue_depth,
2017                                 conn->ibc_peer->ibp_nid);
2018                         kibnal_connreq_done (conn, 1, -EPROTO);
2019                         break;
2020                 }
2021                                 
2022                 CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2023                        conn, conn->ibc_peer->ibp_nid);
2024
2025                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2026                 break;
2027         }
2028
2029         case TS_IB_CM_ESTABLISHED:
2030                 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED\n",
2031                        conn, conn->ibc_peer->ibp_nid);
2032
2033                 kibnal_connreq_done (conn, 1, 0);
2034                 break;
2035
2036         case TS_IB_CM_IDLE:
2037                 CERROR("Connection %p -> "LPX64" IDLE\n",
2038                        conn, conn->ibc_peer->ibp_nid);
2039                 /* Back out state change: I'm disengaged from CM */
2040                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2041                 
2042                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2043                 break;
2044
2045         default:
2046                 CERROR("Connection %p -> "LPX64" ERROR %d\n",
2047                        conn, conn->ibc_peer->ibp_nid, event);
2048                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2049                 break;
2050         }
2051
2052         /* NB if the connreq is done, we switch to kibnal_conn_callback */
2053         return TS_IB_CM_CALLBACK_PROCEED;
2054 }
2055
2056 int
2057 kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2058                           struct ib_path_record *resp, int remaining,
2059                           void *arg)
2060 {
2061         kib_conn_t *conn = arg;
2062         kib_peer_t *peer = conn->ibc_peer;
2063         kib_msg_t  *msg = &conn->ibc_connreq->cr_msg;
2064
2065         if (status != 0) {
2066                 CERROR ("status %d\n", status);
2067                 kibnal_connreq_done (conn, 1, status);
2068                 goto out;
2069         }
2070
2071         conn->ibc_connreq->cr_path = *resp;
2072
2073         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2074         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2075         kibnal_pack_msg(msg, 0, peer->ibp_nid, conn->ibc_incarnation);
2076
2077         conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2078                 .qp                   = conn->ibc_qp,
2079                 .req_private_data     = msg,
2080                 .req_private_data_len = msg->ibm_nob,
2081                 .responder_resources  = IBNAL_RESPONDER_RESOURCES,
2082                 .initiator_depth      = IBNAL_RESPONDER_RESOURCES,
2083                 .retry_count          = IBNAL_RETRY,
2084                 .rnr_retry_count      = IBNAL_RNR_RETRY,
2085                 .cm_response_timeout  = kibnal_tunables.kib_io_timeout,
2086                 .max_cm_retries       = IBNAL_CM_RETRY,
2087                 .flow_control         = IBNAL_FLOW_CONTROL,
2088         };
2089
2090         /* XXX set timeout just like SDP!!!*/
2091         conn->ibc_connreq->cr_path.packet_life = 13;
2092         
2093         /* Flag I'm getting involved with the CM... */
2094         conn->ibc_state = IBNAL_CONN_CONNECTING;
2095
2096         CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2097                conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, peer->ibp_nid);
2098
2099         /* kibnal_connect_callback gets my conn ref */
2100         status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, 
2101                                 &conn->ibc_connreq->cr_path, NULL,
2102                                 conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
2103                                 kibnal_active_conn_callback, conn,
2104                                 &conn->ibc_comm_id);
2105         if (status != 0) {
2106                 CERROR ("Connect: %d\n", status);
2107                 /* Back out state change: I've not got a CM comm_id yet... */
2108                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2109                 kibnal_connreq_done (conn, 1, status);
2110         }
2111         
2112  out:
2113         /* return non-zero to prevent further callbacks */
2114         return 1;
2115 }
2116
2117 void
2118 kibnal_connect_peer (kib_peer_t *peer)
2119 {
2120         kib_conn_t  *conn;
2121         int          rc;
2122
2123         conn = kibnal_create_conn();
2124         if (conn == NULL) {
2125                 CERROR ("Can't allocate conn\n");
2126                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2127                 return;
2128         }
2129
2130         conn->ibc_peer = peer;
2131         atomic_inc (&peer->ibp_refcount);
2132
2133         PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2134         if (conn->ibc_connreq == NULL) {
2135                 CERROR ("Can't allocate connreq\n");
2136                 kibnal_connreq_done (conn, 1, -ENOMEM);
2137                 return;
2138         }
2139
2140         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2141
2142         rc = kibnal_make_svcqry(conn);
2143         if (rc != 0) {
2144                 kibnal_connreq_done (conn, 1, rc);
2145                 return;
2146         }
2147
2148         rc = ib_cached_gid_get(kibnal_data.kib_device,
2149                                kibnal_data.kib_port, 0,
2150                                conn->ibc_connreq->cr_gid);
2151         LASSERT (rc == 0);
2152
2153         /* kibnal_pathreq_callback gets my conn ref */
2154         rc = tsIbPathRecordRequest (kibnal_data.kib_device,
2155                                     kibnal_data.kib_port,
2156                                     conn->ibc_connreq->cr_gid,
2157                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
2158                                     conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
2159                                     0,
2160                                     kibnal_tunables.kib_io_timeout * HZ,
2161                                     0,
2162                                     kibnal_pathreq_callback, conn, 
2163                                     &conn->ibc_connreq->cr_tid);
2164         if (rc == 0)
2165                 return;
2166
2167         CERROR ("Path record request: %d\n", rc);
2168         kibnal_connreq_done (conn, 1, rc);
2169 }
2170
2171 int
2172 kibnal_conn_timed_out (kib_conn_t *conn)
2173 {
2174         kib_tx_t          *tx;
2175         struct list_head  *ttmp;
2176         unsigned long      flags;
2177
2178         spin_lock_irqsave (&conn->ibc_lock, flags);
2179
2180         list_for_each (ttmp, &conn->ibc_tx_queue) {
2181                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2182
2183                 LASSERT (!tx->tx_passive_rdma_wait);
2184                 LASSERT (tx->tx_sending == 0);
2185
2186                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2187                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2188                         return 1;
2189                 }
2190         }
2191
2192         list_for_each (ttmp, &conn->ibc_active_txs) {
2193                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2194
2195                 LASSERT (tx->tx_passive_rdma ||
2196                          !tx->tx_passive_rdma_wait);
2197
2198                 LASSERT (tx->tx_passive_rdma_wait ||
2199                          tx->tx_sending != 0);
2200
2201                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2202                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2203                         return 1;
2204                 }
2205         }
2206
2207         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2208
2209         return 0;
2210 }
2211
2212 void
2213 kibnal_check_conns (int idx)
2214 {
2215         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2216         struct list_head  *ptmp;
2217         kib_peer_t        *peer;
2218         kib_conn_t        *conn;
2219         struct list_head  *ctmp;
2220         unsigned long      flags;
2221
2222  again:
2223         /* NB. We expect to have a look at all the peers and not find any
2224          * rdmas to time out, so we just use a shared lock while we
2225          * take a look... */
2226         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2227
2228         list_for_each (ptmp, peers) {
2229                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2230
2231                 list_for_each (ctmp, &peer->ibp_conns) {
2232                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2233
2234                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2235
2236
2237                         /* In case we have enough credits to return via a
2238                          * NOOP, but there were no non-blocking tx descs
2239                          * free to do it last time... */
2240                         kibnal_check_sends(conn);
2241
2242                         if (!kibnal_conn_timed_out(conn))
2243                                 continue;
2244                         
2245                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2246                                conn, conn->ibc_state, peer->ibp_nid,
2247                                atomic_read (&conn->ibc_refcount));
2248
2249                         atomic_inc (&conn->ibc_refcount);
2250                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2251                                                flags);
2252
2253                         CERROR("Timed out RDMA with "LPX64"\n",
2254                                peer->ibp_nid);
2255
2256                         kibnal_close_conn (conn, -ETIMEDOUT);
2257                         kibnal_put_conn (conn);
2258
2259                         /* start again now I've dropped the lock */
2260                         goto again;
2261                 }
2262         }
2263
2264         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2265 }
2266
2267 void
2268 kibnal_terminate_conn (kib_conn_t *conn)
2269 {
2270         int           rc;
2271
2272         CDEBUG(D_NET, "conn %p\n", conn);
2273         LASSERT (conn->ibc_state == IBNAL_CONN_DEATHROW);
2274         conn->ibc_state = IBNAL_CONN_ZOMBIE;
2275
2276         rc = ib_cm_disconnect (conn->ibc_comm_id);
2277         if (rc != 0)
2278                 CERROR ("Error %d disconnecting conn %p -> "LPX64"\n",
2279                         rc, conn, conn->ibc_peer->ibp_nid);
2280 }
2281
2282 int
2283 kibnal_reaper (void *arg)
2284 {
2285         wait_queue_t       wait;
2286         unsigned long      flags;
2287         kib_conn_t        *conn;
2288         int                timeout;
2289         int                i;
2290         int                peer_index = 0;
2291         unsigned long      deadline = jiffies;
2292         
2293         kportal_daemonize ("kibnal_reaper");
2294         kportal_blockallsigs ();
2295
2296         init_waitqueue_entry (&wait, current);
2297
2298         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2299
2300         while (!kibnal_data.kib_shutdown) {
2301                 if (!list_empty (&kibnal_data.kib_reaper_conns)) {
2302                         conn = list_entry (kibnal_data.kib_reaper_conns.next,
2303                                            kib_conn_t, ibc_list);
2304                         list_del (&conn->ibc_list);
2305                         
2306                         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2307
2308                         switch (conn->ibc_state) {
2309                         case IBNAL_CONN_DEATHROW:
2310                                 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2311                                 /* Disconnect: conn becomes a zombie in the
2312                                  * callback and last ref reschedules it
2313                                  * here... */
2314                                 kibnal_terminate_conn(conn);
2315                                 kibnal_put_conn (conn);
2316                                 break;
2317                                 
2318                         case IBNAL_CONN_ZOMBIE:
2319                                 kibnal_destroy_conn (conn);
2320                                 break;
2321                                 
2322                         default:
2323                                 CERROR ("Bad conn %p state: %d\n",
2324                                         conn, conn->ibc_state);
2325                                 LBUG();
2326                         }
2327
2328                         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2329                         continue;
2330                 }
2331
2332                 spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2333
2334                 /* careful with the jiffy wrap... */
2335                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2336                         const int n = 4;
2337                         const int p = 1;
2338                         int       chunk = kibnal_data.kib_peer_hash_size;
2339                         
2340                         /* Time to check for RDMA timeouts on a few more
2341                          * peers: I do checks every 'p' seconds on a
2342                          * proportion of the peer table and I need to check
2343                          * every connection 'n' times within a timeout
2344                          * interval, to ensure I detect a timeout on any
2345                          * connection within (n+1)/n times the timeout
2346                          * interval. */
2347
2348                         if (kibnal_tunables.kib_io_timeout > n * p)
2349                                 chunk = (chunk * n * p) / 
2350                                         kibnal_tunables.kib_io_timeout;
2351                         if (chunk == 0)
2352                                 chunk = 1;
2353
2354                         for (i = 0; i < chunk; i++) {
2355                                 kibnal_check_conns (peer_index);
2356                                 peer_index = (peer_index + 1) % 
2357                                              kibnal_data.kib_peer_hash_size;
2358                         }
2359
2360                         deadline += p * HZ;
2361                 }
2362
2363                 kibnal_data.kib_reaper_waketime = jiffies + timeout;
2364
2365                 set_current_state (TASK_INTERRUPTIBLE);
2366                 add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2367
2368                 schedule_timeout (timeout);
2369
2370                 set_current_state (TASK_RUNNING);
2371                 remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
2372
2373                 spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
2374         }
2375
2376         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
2377
2378         kibnal_thread_fini ();
2379         return (0);
2380 }
2381
2382 int
2383 kibnal_connd (void *arg)
2384 {
2385         long               id = (long)arg;
2386         char               name[16];
2387         wait_queue_t       wait;
2388         unsigned long      flags;
2389         kib_peer_t        *peer;
2390         kib_acceptsock_t  *as;
2391         int                did_something;
2392
2393         snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
2394         kportal_daemonize(name);
2395         kportal_blockallsigs();
2396
2397         init_waitqueue_entry (&wait, current);
2398
2399         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2400
2401         while (!kibnal_data.kib_shutdown) {
2402                 did_something = 0;
2403
2404                 if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
2405                         as = list_entry (kibnal_data.kib_connd_acceptq.next,
2406                                          kib_acceptsock_t, ibas_list);
2407                         list_del (&as->ibas_list);
2408                         
2409                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2410
2411                         kibnal_handle_svcqry(as->ibas_sock);
2412                         sock_release(as->ibas_sock);
2413                         PORTAL_FREE(as, sizeof(*as));
2414                         
2415                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2416                         did_something = 1;
2417                 }
2418                         
2419                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2420                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2421                                            kib_peer_t, ibp_connd_list);
2422                         
2423                         list_del_init (&peer->ibp_connd_list);
2424                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2425
2426                         kibnal_connect_peer (peer);
2427                         kibnal_put_peer (peer);
2428
2429                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2430                         did_something = 1;
2431                 }
2432
2433                 if (did_something)
2434                         continue;
2435
2436                 set_current_state (TASK_INTERRUPTIBLE);
2437                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2438
2439                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2440
2441                 schedule();
2442
2443                 set_current_state (TASK_RUNNING);
2444                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2445
2446                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2447         }
2448
2449         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2450
2451         kibnal_thread_fini ();
2452         return (0);
2453 }
2454
2455 int
2456 kibnal_scheduler(void *arg)
2457 {
2458         long            id = (long)arg;
2459         char            name[16];
2460         kib_rx_t       *rx;
2461         kib_tx_t       *tx;
2462         unsigned long   flags;
2463         int             rc;
2464         int             counter = 0;
2465         int             did_something;
2466
2467         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2468         kportal_daemonize(name);
2469         kportal_blockallsigs();
2470
2471         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2472
2473         while (!kibnal_data.kib_shutdown) {
2474                 did_something = 0;
2475
2476                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2477                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2478                                         kib_tx_t, tx_list);
2479                         list_del(&tx->tx_list);
2480                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2481                                                flags);
2482                         kibnal_tx_done(tx);
2483
2484                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2485                                           flags);
2486                 }
2487
2488                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2489                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2490                                         kib_rx_t, rx_list);
2491                         list_del(&rx->rx_list);
2492                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2493                                                flags);
2494
2495                         kibnal_rx(rx);
2496
2497                         did_something = 1;
2498                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2499                                           flags);
2500                 }
2501
2502                 /* nothing to do or hogging CPU */
2503                 if (!did_something || counter++ == IBNAL_RESCHED) {
2504                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2505                                                flags);
2506                         counter = 0;
2507
2508                         if (!did_something) {
2509                                 rc = wait_event_interruptible(
2510                                         kibnal_data.kib_sched_waitq,
2511                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2512                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2513                                         kibnal_data.kib_shutdown);
2514                         } else {
2515                                 our_cond_resched();
2516                         }
2517
2518                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2519                                           flags);
2520                 }
2521         }
2522
2523         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
2524
2525         kibnal_thread_fini();
2526         return (0);
2527 }
2528
2529
2530 lib_nal_t kibnal_lib = {
2531         libnal_data:        &kibnal_data,      /* NAL private data */
2532         libnal_send:         kibnal_send,
2533         libnal_send_pages:   kibnal_send_pages,
2534         libnal_recv:         kibnal_recv,
2535         libnal_recv_pages:   kibnal_recv_pages,
2536         libnal_dist:         kibnal_dist
2537 };