Whamcloud - gitweb
e21d62f79db610f8ea7ee6b599476496873fdad6
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 static void kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *info, void *arg);
28
29 /*
30  *  LIB functions follow
31  *
32  */
33 static void
34 kibnal_schedule_tx_done (kib_tx_t *tx)
35 {
36         unsigned long flags;
37
38         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
39
40         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
41         wake_up (&kibnal_data.kib_sched_waitq);
42
43         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
44 }
45
46 static void
47 kibnal_tx_done (kib_tx_t *tx)
48 {
49         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
50         unsigned long    flags;
51         int              i;
52         vv_return_t retval;
53
54         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
55         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
56
57         switch (tx->tx_mapped) {
58         default:
59                 LBUG();
60
61         case KIB_TX_UNMAPPED:
62                 break;
63
64         case KIB_TX_MAPPED:
65                 if (in_interrupt()) {
66                         /* can't deregister memory in IRQ context... */
67                         kibnal_schedule_tx_done(tx);
68                         return;
69                 }
70                 retval = vv_mem_region_destroy(kibnal_data.kib_hca, tx->tx_md.md_handle);
71                 LASSERT (retval == vv_return_ok);
72                 tx->tx_mapped = KIB_TX_UNMAPPED;
73                 break;
74
75 #if IBNAL_FMR
76         case KIB_TX_MAPPED_FMR:
77                 if (in_interrupt() && tx->tx_status != 0) {
78                         /* can't flush FMRs in IRQ context... */
79                         kibnal_schedule_tx_done(tx);
80                         return;
81                 }              
82
83                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
84                 LASSERT (rc == 0);
85
86                 if (tx->tx_status != 0)
87                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
88                 tx->tx_mapped = KIB_TX_UNMAPPED;
89                 break;
90 #endif
91         }
92
93         for (i = 0; i < 2; i++) {
94                 /* tx may have up to 2 libmsgs to finalise */
95                 if (tx->tx_libmsg[i] == NULL)
96                         continue;
97
98                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
99                 tx->tx_libmsg[i] = NULL;
100         }
101         
102         if (tx->tx_conn != NULL) {
103                 kibnal_put_conn (tx->tx_conn);
104                 tx->tx_conn = NULL;
105         }
106
107         tx->tx_nsp = 0;
108         tx->tx_passive_rdma = 0;
109         tx->tx_status = 0;
110
111         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
112
113         if (tx->tx_isnblk) {
114                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
115         } else {
116                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
117                 wake_up (&kibnal_data.kib_idle_tx_waitq);
118         }
119
120         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
121 }
122
123 static kib_tx_t *
124 kibnal_get_idle_tx (int may_block) 
125 {
126         unsigned long  flags;
127         kib_tx_t      *tx = NULL;
128         ENTRY;
129         
130         for (;;) {
131                 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
132
133                 /* "normal" descriptor is free */
134                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
135                         tx = list_entry (kibnal_data.kib_idle_txs.next,
136                                          kib_tx_t, tx_list);
137                         break;
138                 }
139
140                 if (!may_block) {
141                         /* may dip into reserve pool */
142                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
143                                 CERROR ("reserved tx desc pool exhausted\n");
144                                 break;
145                         }
146
147                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
148                                          kib_tx_t, tx_list);
149                         break;
150                 }
151
152                 /* block for idle tx */
153                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
154
155                 wait_event (kibnal_data.kib_idle_tx_waitq,
156                             !list_empty (&kibnal_data.kib_idle_txs) ||
157                             kibnal_data.kib_shutdown);
158         }
159
160         if (tx != NULL) {
161                 list_del (&tx->tx_list);
162
163                 /* Allocate a new passive RDMA completion cookie.  It might
164                  * not be needed, but we've got a lock right now and we're
165                  * unlikely to wrap... */
166                 tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
167
168                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
169                 LASSERT (tx->tx_nsp == 0);
170                 LASSERT (tx->tx_sending == 0);
171                 LASSERT (tx->tx_status == 0);
172                 LASSERT (tx->tx_conn == NULL);
173                 LASSERT (!tx->tx_passive_rdma);
174                 LASSERT (!tx->tx_passive_rdma_wait);
175                 LASSERT (tx->tx_libmsg[0] == NULL);
176                 LASSERT (tx->tx_libmsg[1] == NULL);
177         }
178
179         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
180         
181         RETURN(tx);
182 }
183
184 static int
185 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
186 {
187         /* I would guess that if kibnal_get_peer (nid) == NULL,
188            and we're not routing, then 'nid' is very distant :) */
189         if ( nal->libnal_ni.ni_pid.nid == nid ) {
190                 *dist = 0;
191         } else {
192                 *dist = 1;
193         }
194
195         return 0;
196 }
197
198 static void
199 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
200 {
201         struct list_head *ttmp;
202         unsigned long     flags;
203         int               idle;
204
205         spin_lock_irqsave (&conn->ibc_lock, flags);
206
207         list_for_each (ttmp, &conn->ibc_active_txs) {
208                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
209
210                 LASSERT (tx->tx_passive_rdma ||
211                          !tx->tx_passive_rdma_wait);
212
213                 LASSERT (tx->tx_passive_rdma_wait ||
214                          tx->tx_sending != 0);
215
216                 if (!tx->tx_passive_rdma_wait ||
217                     tx->tx_passive_rdma_cookie != cookie)
218                         continue;
219
220                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
221
222                 tx->tx_status = status;
223                 tx->tx_passive_rdma_wait = 0;
224                 idle = (tx->tx_sending == 0);
225
226                 if (idle)
227                         list_del (&tx->tx_list);
228
229                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
230
231                 /* I could be racing with tx callbacks.  It's whoever
232                  * _makes_ tx idle that frees it */
233                 if (idle)
234                         kibnal_tx_done (tx);
235                 return;
236         }
237                 
238         spin_unlock_irqrestore (&conn->ibc_lock, flags);
239
240         CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
241                 cookie, conn->ibc_peer->ibp_nid);
242 }
243
244 static void
245 kibnal_post_rx (kib_rx_t *rx, int do_credits)
246 {
247         kib_conn_t   *conn = rx->rx_conn;
248         int           rc = 0;
249         unsigned long flags;
250         vv_return_t retval;
251
252         ENTRY;
253         
254         rx->rx_gl = (vv_scatgat_t) {
255                 .v_address = (void *)rx->rx_msg,
256                 .length    = IBNAL_MSG_SIZE,
257                 .l_key     = rx->l_key,
258         };
259
260         rx->rx_wrq = (vv_wr_t) {
261                 .wr_id                   = kibnal_ptr2wreqid(rx, 1),
262                 .completion_notification = 1,
263                 .scatgat_list            = &rx->rx_gl,
264                 .num_of_data_segments    = 1,
265                 .wr_type                 = vv_wr_receive,
266         };
267
268         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED,
269                                     IBNAL_CONN_DREP);
270         LASSERT (!rx->rx_posted);
271         rx->rx_posted = 1;
272         mb();
273
274         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
275                 rc = -ECONNABORTED;
276         else {
277                 retval = vv_post_receive(kibnal_data.kib_hca, conn->ibc_qp, &rx->rx_wrq);
278
279                 if (retval) {
280                         CDEBUG(D_NET, "post failed %d\n", retval);
281                         rc = -EINVAL;
282                 }
283                 CDEBUG(D_NET, "posted rx %p\n", &rx->rx_wrq);
284         }
285
286         if (rc == 0) {
287                 if (do_credits) {
288                         spin_lock_irqsave(&conn->ibc_lock, flags);
289                         conn->ibc_outstanding_credits++;
290                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
291
292                         kibnal_check_sends(conn);
293                 }
294                 EXIT;
295                 return;
296         }
297
298         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
299                 CERROR ("Error posting receive -> "LPX64": %d\n",
300                         conn->ibc_peer->ibp_nid, rc);
301                 kibnal_close_conn (rx->rx_conn, rc);
302         } else {
303                 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
304                         conn->ibc_peer->ibp_nid, rc);
305         }
306
307         /* Drop rx's ref */
308         kibnal_put_conn (conn);
309         EXIT;
310 }
311
312 #if IBNAL_CKSUM
313 static inline __u32 kibnal_cksum (void *ptr, int nob)
314 {
315         char  *c  = ptr;
316         __u32  sum = 0;
317
318         while (nob-- > 0)
319                 sum = ((sum << 1) | (sum >> 31)) + *c++;
320         
321         return (sum);
322 }
323 #endif
324
325 static void
326 kibnal_rx_callback (vv_wc_t *wc)
327 {
328         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(wc->wr_id);
329         kib_msg_t    *msg = rx->rx_msg;
330         kib_conn_t   *conn = rx->rx_conn;
331         int           nob = wc->num_bytes_transfered;
332         const int     base_nob = offsetof(kib_msg_t, ibm_u);
333         int           credits;
334         int           flipped;
335         unsigned long flags;
336         __u32         i;
337 #if IBNAL_CKSUM
338         __u32         msg_cksum;
339         __u32         computed_cksum;
340 #endif
341
342         /* we set the QP to erroring after we've finished disconnecting, 
343          * maybe we should do so sooner. */
344         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED, 
345                                     IBNAL_CONN_DISCONNECTED);
346
347         CDEBUG(D_NET, "rx %p conn %p, nob=%d\n", rx, conn, nob);
348
349         LASSERT (rx->rx_posted);
350         rx->rx_posted = 0;
351         mb();
352
353         /* receives complete with error in any case after we've started
354          * disconnecting */
355         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
356                 goto failed;
357
358         if (wc->completion_status != vv_comp_status_success) {
359                 CERROR("Rx from "LPX64" failed: %d\n", 
360                        conn->ibc_peer->ibp_nid, wc->completion_status);
361                 goto failed;
362         }
363
364         if (nob < base_nob) {
365                 CERROR ("Short rx from "LPX64": %d < expected %d\n",
366                         conn->ibc_peer->ibp_nid, nob, base_nob);
367                 goto failed;
368         }
369
370         /* Receiver does any byte flipping if necessary... */
371
372         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
373                 flipped = 0;
374         } else {
375                 if (msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
376                         CERROR ("Unrecognised magic: %08x from "LPX64"\n", 
377                                 msg->ibm_magic, conn->ibc_peer->ibp_nid);
378                         goto failed;
379                 }
380                 flipped = 1;
381                 __swab16s (&msg->ibm_version);
382                 LASSERT (sizeof(msg->ibm_type) == 1);
383                 LASSERT (sizeof(msg->ibm_credits) == 1);
384         }
385
386         if (msg->ibm_version != IBNAL_MSG_VERSION) {
387                 CERROR ("Incompatible msg version %d (%d expected)\n",
388                         msg->ibm_version, IBNAL_MSG_VERSION);
389                 goto failed;
390         }
391
392 #if IBNAL_CKSUM
393         if (nob != msg->ibm_nob) {
394                 CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->ibm_nob);
395                 goto failed;
396         }
397
398         msg_cksum = le32_to_cpu(msg->ibm_cksum);
399         msg->ibm_cksum = 0;
400         computed_cksum = kibnal_cksum (msg, nob);
401         
402         if (msg_cksum != computed_cksum) {
403                 CERROR ("Checksum failure %d: (%d expected)\n",
404                         computed_cksum, msg_cksum);
405 //                goto failed;
406         }
407         CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob);
408 #endif
409
410         /* Have I received credits that will let me send? */
411         credits = msg->ibm_credits;
412         if (credits != 0) {
413                 spin_lock_irqsave(&conn->ibc_lock, flags);
414                 conn->ibc_credits += credits;
415                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
416                 
417                 kibnal_check_sends(conn);
418         }
419
420         switch (msg->ibm_type) {
421         case IBNAL_MSG_NOOP:
422                 kibnal_post_rx (rx, 1);
423                 return;
424
425         case IBNAL_MSG_IMMEDIATE:
426                 if (nob < base_nob + sizeof (kib_immediate_msg_t)) {
427                         CERROR ("Short IMMEDIATE from "LPX64": %d\n",
428                                 conn->ibc_peer->ibp_nid, nob);
429                         goto failed;
430                 }
431                 break;
432                 
433         case IBNAL_MSG_PUT_RDMA:
434         case IBNAL_MSG_GET_RDMA:
435                 if (nob < base_nob + sizeof (kib_rdma_msg_t)) {
436                         CERROR ("Short RDMA msg from "LPX64": %d\n",
437                                 conn->ibc_peer->ibp_nid, nob);
438                         goto failed;
439                 }
440                 if (flipped) 
441                         __swab32(msg->ibm_u.rdma.ibrm_num_descs);
442
443                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64":\n",
444                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie);
445
446                 if ((msg->ibm_u.rdma.ibrm_num_descs > PTL_MD_MAX_IOV) ||
447                     (kib_rdma_msg_len(msg->ibm_u.rdma.ibrm_num_descs) > 
448                      min(nob, IBNAL_MSG_SIZE))) {
449                         CERROR ("num_descs %d too large\n", 
450                                 msg->ibm_u.rdma.ibrm_num_descs);
451                         goto failed;
452                 }
453
454                 if (flipped) {
455                         __swab32(msg->ibm_u.rdma.rd_key);
456                 }
457
458                 for(i = 0; i < msg->ibm_u.rdma.ibrm_num_descs; i++) {
459                         kib_rdma_desc_t *desc = &msg->ibm_u.rdma.ibrm_desc[i];
460
461                         if (flipped) {
462                                 __swab32(desc->rd_nob);
463                                 __swab64(desc->rd_addr);
464                         }
465
466                         CDEBUG(D_NET, "  key %x, " "addr "LPX64", nob %u\n",
467                                msg->ibm_u.rdma.rd_key, desc->rd_addr, desc->rd_nob);
468                 }
469                 break;
470                         
471         case IBNAL_MSG_PUT_DONE:
472         case IBNAL_MSG_GET_DONE:
473                 if (nob < base_nob + sizeof (kib_completion_msg_t)) {
474                         CERROR ("Short COMPLETION msg from "LPX64": %d\n",
475                                 conn->ibc_peer->ibp_nid, nob);
476                         goto failed;
477                 }
478                 if (flipped)
479                         __swab32s(&msg->ibm_u.completion.ibcm_status);
480                 
481                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
482                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
483                        msg->ibm_u.completion.ibcm_status);
484
485                 kibnal_complete_passive_rdma (conn, 
486                                               msg->ibm_u.completion.ibcm_cookie,
487                                               msg->ibm_u.completion.ibcm_status);
488                 kibnal_post_rx (rx, 1);
489                 return;
490                         
491         default:
492                 CERROR ("Can't parse type from "LPX64": %d\n",
493                         conn->ibc_peer->ibp_nid, msg->ibm_type);
494                 goto failed;
495         }
496
497         /* schedule for kibnal_rx() in thread context */
498         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
499         
500         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
501         wake_up (&kibnal_data.kib_sched_waitq);
502         
503         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
504
505         return;
506         
507  failed:
508         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
509         kibnal_close_conn(conn, -ECONNABORTED);
510
511         /* Don't re-post rx & drop its ref on conn */
512         kibnal_put_conn(conn);
513 }
514
515 static void
516 kibnal_rx (kib_rx_t *rx)
517 {
518         kib_msg_t   *msg = rx->rx_msg;
519
520         /* Clear flag so I can detect if I've sent an RDMA completion */
521         rx->rx_rdma = 0;
522
523         switch (msg->ibm_type) {
524         case IBNAL_MSG_GET_RDMA:
525                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
526                 /* If the incoming get was matched, I'll have initiated the
527                  * RDMA and the completion message... */
528                 if (rx->rx_rdma)
529                         break;
530
531                 /* Otherwise, I'll send a failed completion now to prevent
532                  * the peer's GET blocking for the full timeout. */
533                 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
534                         rx->rx_conn->ibc_peer->ibp_nid);
535                 kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
536                                           rx, NULL, 0, NULL, NULL, 0, 0);
537                 break;
538                 
539         case IBNAL_MSG_PUT_RDMA:
540                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
541                 if (rx->rx_rdma)
542                         break;
543                 /* This is most unusual, since even if lib_parse() didn't
544                  * match anything, it should have asked us to read (and
545                  * discard) the payload.  The portals header must be
546                  * inconsistent with this message type, so it's the
547                  * sender's fault for sending garbage and she can time
548                  * herself out... */
549                 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
550                         rx->rx_conn->ibc_peer->ibp_nid);
551                 break;
552
553         case IBNAL_MSG_IMMEDIATE:
554                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
555                 LASSERT (!rx->rx_rdma);
556                 break;
557                 
558         default:
559                 LBUG();
560                 break;
561         }
562
563         kibnal_post_rx (rx, 1);
564 }
565
566 static struct page *
567 kibnal_kvaddr_to_page (unsigned long vaddr)
568 {
569         struct page *page;
570
571         if (vaddr >= VMALLOC_START &&
572             vaddr < VMALLOC_END)
573                 page = vmalloc_to_page ((void *)vaddr);
574 #if CONFIG_HIGHMEM
575         else if (vaddr >= PKMAP_BASE &&
576                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
577                 page = vmalloc_to_page ((void *)vaddr);
578         /* in 2.4 ^ just walks the page tables */
579 #endif
580         else
581                 page = virt_to_page (vaddr);
582
583         if (!VALID_PAGE (page))
584                 page = NULL;
585
586         return page;
587 }
588
589 static void
590 kibnal_fill_ibrm(kib_tx_t *tx, struct page *page, unsigned long page_offset,
591                  unsigned long len, int active)
592 {
593         kib_rdma_msg_t *ibrm = &tx->tx_msg->ibm_u.rdma;
594         kib_rdma_desc_t *desc;
595         vv_l_key_t l_key;
596         vv_r_key_t r_key;
597         void *addr;
598         vv_mem_reg_h_t mem_h;
599         vv_return_t retval;
600
601         LASSERTF(ibrm->ibrm_num_descs < PTL_MD_MAX_IOV, "%u\n", 
602                  ibrm->ibrm_num_descs);
603
604         desc = &ibrm->ibrm_desc[ibrm->ibrm_num_descs];
605
606         addr = page_address(page) + page_offset;
607
608         /* TODO: This next step is only needed to get either the lkey
609          * or the rkey. However they should be the same than for the
610          * tx buffer, so we might as well use it. */
611         retval = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
612                                       addr,
613                                       len,
614                                       &mem_h,
615                                       &l_key,
616                                       &r_key);
617         if (retval) {
618                 CERROR("vv_get_gen_mr_attrib failed: %d", retval);
619                 /* TODO: this shouldn't really fail, but what if? */
620                 return;
621         }
622
623         if (active) {
624                 ibrm->rd_key = l_key;
625         } else {
626                 ibrm->rd_key = r_key;
627
628                 vv_va2advertise_addr(kibnal_data.kib_hca, addr, &addr);
629         }
630
631         desc->rd_addr = (__u64)(unsigned long)addr;
632         desc->rd_nob = len; /*PAGE_SIZE - kiov->kiov_offset; */
633
634         ibrm->ibrm_num_descs++;
635 }
636
637 static int
638 kibnal_map_rdma_iov(kib_tx_t *tx, unsigned long vaddr, int nob, int active)
639 {
640         struct page *page;
641         int page_offset, len;
642
643         while (nob > 0) {
644                 page = kibnal_kvaddr_to_page(vaddr);
645                 if (page == NULL)
646                         return -EFAULT;
647
648                 page_offset = vaddr & (PAGE_SIZE - 1);
649                 len = min(nob, (int)PAGE_SIZE - page_offset);
650                 
651                 kibnal_fill_ibrm(tx, page, page_offset, len, active);
652                 nob -= len;
653                 vaddr += len;
654         }
655
656         return 0;
657 }
658
659 static int
660 kibnal_map_iov (kib_tx_t *tx, vv_access_con_bit_mask_t access,
661                  int niov, struct iovec *iov, int offset, int nob, int active)
662                  
663 {
664         void   *vaddr;
665         vv_return_t retval;
666
667         LASSERT (nob > 0);
668         LASSERT (niov > 0);
669         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
670
671         while (offset >= iov->iov_len) {
672                 offset -= iov->iov_len;
673                 niov--;
674                 iov++;
675                 LASSERT (niov > 0);
676         }
677
678         if (nob > iov->iov_len - offset) {
679                 CERROR ("Can't map multiple vaddr fragments\n");
680                 return (-EMSGSIZE);
681         }
682
683         /* our large contiguous iov could be backed by multiple physical
684          * pages. */
685         if (kibnal_whole_mem()) {
686                 int rc;
687                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
688                 rc = kibnal_map_rdma_iov(tx, (unsigned long)iov->iov_base + 
689                                          offset, nob, active);
690                 if (rc != 0) {
691                         CERROR ("Can't map iov: %d\n", rc);
692                         return rc;
693                 }
694                 return 0;
695         }
696
697         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
698         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
699
700         retval = vv_mem_region_register(kibnal_data.kib_hca, vaddr, nob,
701                                    kibnal_data.kib_pd, access,
702                                    &tx->tx_md.md_handle, &tx->tx_md.md_lkey,
703                                    &tx->tx_md.md_rkey);
704         if (retval != 0) {
705                 CERROR ("Can't map vaddr %p: %d\n", vaddr, retval);
706                 return -EINVAL;
707         }
708
709         tx->tx_mapped = KIB_TX_MAPPED;
710         return (0);
711 }
712
713 static int
714 kibnal_map_kiov (kib_tx_t *tx, vv_access_con_bit_mask_t access,
715                   int nkiov, ptl_kiov_t *kiov,
716                   int offset, int nob, int active)
717 {
718         vv_phy_list_t  phys_pages;
719         vv_phy_buf_t  *phys_buf = NULL;
720         int            page_offset;
721         int            nphys;
722         int            resid;
723         int            phys_size = 0;
724         int            i, rc = 0;
725         vv_return_t    retval;
726
727         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
728
729         LASSERT (nob > 0);
730         LASSERT (nkiov > 0);
731         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
732
733         while (offset >= kiov->kiov_len) {
734                 offset -= kiov->kiov_len;
735                 nkiov--;
736                 kiov++;
737                 LASSERT (nkiov > 0);
738         }
739
740         page_offset = kiov->kiov_offset + offset;
741         nphys = 1;
742
743         if (!kibnal_whole_mem()) {
744                 phys_size = nkiov * sizeof(vv_phy_buf_t);
745                 PORTAL_ALLOC(phys_buf, phys_size);
746
747                 if (phys_buf == NULL) {
748                         CERROR ("Can't allocate phys_buf\n");
749                         return (-ENOMEM);
750                 }
751
752                 phys_buf[0].start = kibnal_page2phys(kiov->kiov_page);
753                 phys_buf[0].size = PAGE_SIZE;
754
755         } else {
756                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
757                 kibnal_fill_ibrm(tx, kiov->kiov_page, kiov->kiov_offset, 
758                                  kiov->kiov_len, active);
759         }
760
761         resid = nob - (kiov->kiov_len - offset);
762
763         while (resid > 0) {
764                 kiov++;
765                 nkiov--;
766                 LASSERT (nkiov > 0);
767
768                 if (kiov->kiov_offset != 0 ||
769                     ((resid > PAGE_SIZE) && 
770                      kiov->kiov_len < PAGE_SIZE)) {
771                         /* Can't have gaps */
772                         CERROR ("Can't make payload contiguous in I/O VM:"
773                                 "page %d, offset %d, len %d \n", nphys, 
774                                 kiov->kiov_offset, kiov->kiov_len);
775
776                         for (i = -nphys; i < nkiov; i++) 
777                         {
778                                 CERROR("kiov[%d] %p +%d for %d\n",
779                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
780                         }
781                         
782                         rc = -EINVAL;
783                         goto out;
784                 }
785
786                 if (nphys == PTL_MD_MAX_IOV) {
787                         CERROR ("payload too big (%d)\n", nphys);
788                         rc = -EMSGSIZE;
789                         goto out;
790                 }
791
792                 if (!kibnal_whole_mem()) {
793                         LASSERT (nphys * sizeof (vv_phy_buf_t) < phys_size);
794                         phys_buf[nphys].start = kibnal_page2phys(kiov->kiov_page);
795                         phys_buf[nphys].size = PAGE_SIZE;
796
797                 } else {
798                         if (kib_rdma_msg_len(nphys) > IBNAL_MSG_SIZE) {
799                                 CERROR ("payload too big (%d)\n", nphys);
800                                 rc = -EMSGSIZE;
801                                 goto out;
802                         }
803                         kibnal_fill_ibrm(tx, kiov->kiov_page, 
804                                          kiov->kiov_offset, kiov->kiov_len,
805                                          active);
806                 }
807
808                 nphys ++;
809                 resid -= PAGE_SIZE;
810         }
811
812         if (kibnal_whole_mem())
813                 goto out;
814
815 #if 0
816         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
817         for (i = 0; i < nphys; i++)
818                 CWARN ("   [%d] "LPX64"\n", i, phys[i]);
819 #endif
820
821 #if IBNAL_FMR
822 #error "vibnal hasn't learned about FMR yet"
823         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
824                                        phys_pages, nphys,
825                                        &tx->tx_md.md_addr,
826                                        page_offset,
827                                        &tx->tx_md.md_handle.fmr,
828                                        &tx->tx_md.md_lkey,
829                                        &tx->tx_md.md_rkey);
830 #else
831         retval = vv_phy_mem_region_register(kibnal_data.kib_hca,
832                                             &phys_pages,
833                                             IBNAL_RDMA_BASE,
834                                             nphys,
835                                             0,          /* offset */
836                                             kibnal_data.kib_pd,
837                                             vv_acc_l_mem_write | vv_acc_r_mem_write | vv_acc_r_mem_read | vv_acc_mem_bind, /* TODO: translated as-is, but seems incorrect or too much */
838                                             &tx->tx_md.md_handle,
839                                             &tx->tx_md.md_addr,
840                                             &tx->tx_md.md_lkey,
841                                             &tx->tx_md.md_rkey);
842 #endif
843         if (retval == vv_return_ok) {
844                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
845                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
846 #if IBNAL_FMR
847                 tx->tx_mapped = KIB_TX_MAPPED_FMR;
848 #else
849                 tx->tx_mapped = KIB_TX_MAPPED;
850 #endif
851         } else {
852                 CERROR ("Can't map phys_pages: %d\n", retval);
853                 rc = -EFAULT;
854         }
855
856  out:
857         if (phys_buf != NULL)
858                 PORTAL_FREE(phys_buf, phys_size);
859
860         return (rc);
861 }
862
863 static kib_conn_t *
864 kibnal_find_conn_locked (kib_peer_t *peer)
865 {
866         struct list_head *tmp;
867
868         /* just return the first connection */
869         list_for_each (tmp, &peer->ibp_conns) {
870                 return (list_entry(tmp, kib_conn_t, ibc_list));
871         }
872
873         return (NULL);
874 }
875
876 void
877 kibnal_check_sends (kib_conn_t *conn)
878 {
879         unsigned long   flags;
880         kib_tx_t       *tx;
881         int             rc;
882         int             i;
883         int             done;
884         int             nwork;
885
886         ENTRY;
887
888         spin_lock_irqsave (&conn->ibc_lock, flags);
889
890         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
891
892         if (list_empty(&conn->ibc_tx_queue) &&
893             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
894                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
895                 
896                 tx = kibnal_get_idle_tx(0);     /* don't block */
897                 if (tx != NULL)
898                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
899
900                 spin_lock_irqsave(&conn->ibc_lock, flags);
901                 
902                 if (tx != NULL) {
903                         atomic_inc(&conn->ibc_refcount);
904                         kibnal_queue_tx_locked(tx, conn);
905                 }
906         }
907
908         while (!list_empty (&conn->ibc_tx_queue)) {
909                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
910
911                 /* We rely on this for QP sizing */
912                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= IBNAL_TX_MAX_SG);
913
914                 LASSERT (conn->ibc_outstanding_credits >= 0);
915                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
916                 LASSERT (conn->ibc_credits >= 0);
917                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
918
919                 /* Not on ibc_rdma_queue */
920                 LASSERT (!tx->tx_passive_rdma_wait);
921
922                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
923                         GOTO(out, 0);
924
925                 if (conn->ibc_credits == 0)     /* no credits */
926                         GOTO(out, 1);
927                 
928                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
929                     conn->ibc_outstanding_credits == 0) /* giving back credits */
930                         GOTO(out, 2);
931
932                 list_del (&tx->tx_list);
933
934                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
935                     (!list_empty(&conn->ibc_tx_queue) ||
936                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
937                         /* redundant NOOP */
938                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
939                         kibnal_tx_done(tx);
940                         spin_lock_irqsave(&conn->ibc_lock, flags);
941                         continue;
942                 }
943
944                 tx->tx_msg->ibm_credits = conn->ibc_outstanding_credits;
945                 conn->ibc_outstanding_credits = 0;
946
947                 conn->ibc_nsends_posted++;
948                 conn->ibc_credits--;
949
950                 /* we only get a tx completion for the final rdma op */ 
951                 tx->tx_sending = 0;
952                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
953                 list_add (&tx->tx_list, &conn->ibc_active_txs);
954 #if IBNAL_CKSUM
955                 tx->tx_msg->ibm_cksum = 0;
956                 tx->tx_msg->ibm_cksum = kibnal_cksum(tx->tx_msg, tx->tx_msg->ibm_nob);
957                 CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->ibm_cksum, tx->tx_msg->ibm_nob);
958 #endif
959                 /* NB the gap between removing tx from the queue and sending it
960                  * allows message re-ordering to occur */
961
962                 LASSERT (tx->tx_nsp > 0);
963
964                 rc = -ECONNABORTED;
965                 nwork = 0;
966                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
967                         vv_return_t retval;                        
968
969                         tx->tx_status = 0;
970                         rc = 0;
971
972                         retval = vv_post_send_list(kibnal_data.kib_hca, conn->ibc_qp, tx->tx_nsp, tx->tx_wrq, vv_operation_type_send_rc);
973
974                         if (retval != 0) {
975                                 CERROR("post send failed with %d\n", retval);
976                                 rc = -ECONNABORTED;
977                                 break;
978                         }
979                         
980                         tx->tx_sending = tx->tx_nsp;
981                 }
982
983                 if (rc != 0) {
984                         /* NB credits are transferred in the actual
985                          * message, which can only be the last work item */
986                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
987                         conn->ibc_credits++;
988                         conn->ibc_nsends_posted--;
989
990                         tx->tx_status = rc;
991                         tx->tx_passive_rdma_wait = 0;
992
993                         /* TODO: I think this is buggy if vv_post_send_list failed. */
994                         done = (tx->tx_sending == 0);
995                         if (done)
996                                 list_del (&tx->tx_list);
997                         
998                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
999                         
1000                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1001                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
1002                                         rc, conn->ibc_peer->ibp_nid);
1003                         else
1004                                 CDEBUG (D_NET, "Error %d posting transmit to "
1005                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1006
1007                         kibnal_close_conn (conn, rc);
1008
1009                         if (done)
1010                                 kibnal_tx_done (tx);
1011                         return;
1012                 }
1013                 
1014         }
1015
1016         EXIT;
1017 out:
1018         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1019 }
1020
1021 static void
1022 kibnal_tx_callback (vv_wc_t *wc)
1023 {
1024         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(wc->wr_id);
1025         kib_conn_t   *conn;
1026         unsigned long flags;
1027         int           idle;
1028
1029         conn = tx->tx_conn;
1030         LASSERT (conn != NULL);
1031         LASSERT (tx->tx_sending != 0);
1032
1033         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
1034                tx->tx_sending, tx->tx_nsp, wc->completion_status);
1035
1036         spin_lock_irqsave(&conn->ibc_lock, flags);
1037
1038         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1039          * gets to free it, which also drops its ref on 'conn'.  If it's
1040          * not me, then I take an extra ref on conn so it can't disappear
1041          * under me. */
1042
1043         tx->tx_sending--;
1044         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1045                 (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
1046         if (idle)
1047                 list_del(&tx->tx_list);
1048
1049         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1050                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1051                atomic_read (&conn->ibc_refcount));
1052         atomic_inc (&conn->ibc_refcount);
1053
1054         if (tx->tx_sending == 0)
1055                 conn->ibc_nsends_posted--;
1056
1057         if (wc->completion_status != vv_comp_status_success &&
1058             tx->tx_status == 0)
1059                 tx->tx_status = -ECONNABORTED;
1060
1061         spin_unlock_irqrestore(&conn->ibc_lock, flags);
1062
1063         if (idle)
1064                 kibnal_tx_done (tx);
1065
1066         if (wc->completion_status != vv_comp_status_success) {
1067                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
1068                         conn->ibc_peer->ibp_nid, wc->completion_status);
1069                 kibnal_close_conn (conn, -ENETDOWN);
1070         } else {
1071                 /* can I shovel some more sends out the door? */
1072                 kibnal_check_sends(conn);
1073         }
1074
1075         kibnal_put_conn (conn);
1076 }
1077
1078 void 
1079 kibnal_ca_async_callback(vv_event_record_t ev)
1080 {
1081         /* XXX flesh out.  this seems largely for async errors */
1082         CERROR("type: %d, port: %d, data: "LPX64"\n", ev.event_type, ev.port_num, ev.type.data);
1083 }
1084
1085 void
1086 kibnal_ca_callback (unsigned long unused_context)
1087 {
1088         vv_wc_t wc;
1089         int armed = 0;
1090         vv_return_t retval;
1091
1092         for(;;) {
1093
1094                 while (vv_poll_for_completion(kibnal_data.kib_hca, kibnal_data.kib_cq, &wc) == vv_return_ok) {
1095
1096                         /* We will need to rearm the CQ to avoid a potential race. */
1097                         armed = 0;
1098
1099                         if (kibnal_wreqid_is_rx(wc.wr_id))
1100                                 kibnal_rx_callback(&wc);
1101                         else
1102                                 kibnal_tx_callback(&wc);
1103                 }
1104
1105                 if (armed)
1106                         return;
1107                 
1108                 retval = vv_request_completion_notification(kibnal_data.kib_hca, kibnal_data.kib_cq, vv_next_solicit_unsolicit_event);
1109                 if (retval != 0) {
1110                         CERROR ("Failed to re-arm completion queue: %d\n", retval);
1111                         return;
1112                 }
1113
1114                 armed = 1;
1115         }
1116 }
1117
1118 void
1119 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1120 {
1121         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nsp];
1122         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nsp];
1123         int           fence;
1124         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1125
1126         LASSERT (tx->tx_nsp >= 0 && 
1127                  tx->tx_nsp < sizeof(tx->tx_wrq)/sizeof(tx->tx_wrq[0]));
1128         LASSERT (nob <= IBNAL_MSG_SIZE);
1129         
1130         tx->tx_msg->ibm_magic = IBNAL_MSG_MAGIC;
1131         tx->tx_msg->ibm_version = IBNAL_MSG_VERSION;
1132         tx->tx_msg->ibm_type = type;
1133 #if IBNAL_CKSUM
1134         tx->tx_msg->ibm_nob = nob;
1135 #endif
1136         /* Fence the message if it's bundled with an RDMA read */
1137         fence = (tx->tx_nsp > 0) &&
1138                 (type == IBNAL_MSG_PUT_DONE);
1139
1140         *gl = (vv_scatgat_t) {
1141                 .v_address = (void *)tx->tx_msg,
1142                 .length    = nob,
1143                 .l_key     = tx->l_key,
1144         };
1145
1146         wrq->wr_id =  kibnal_ptr2wreqid(tx, 0);
1147         wrq->completion_notification = 1;
1148         wrq->scatgat_list = gl;
1149         wrq->num_of_data_segments = 1;
1150         wrq->wr_type = vv_wr_send;
1151
1152         wrq->type.send.solicited_event = 1;
1153
1154         wrq->type.send.send_qp_type.rc_type.fance_indicator = fence;
1155
1156         tx->tx_nsp++;
1157 }
1158
1159 static void
1160 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1161 {
1162         unsigned long         flags;
1163
1164         spin_lock_irqsave(&conn->ibc_lock, flags);
1165
1166         kibnal_queue_tx_locked (tx, conn);
1167         
1168         spin_unlock_irqrestore(&conn->ibc_lock, flags);
1169         
1170         kibnal_check_sends(conn);
1171 }
1172
1173 static void
1174 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1175 {
1176         unsigned long    flags;
1177         kib_peer_t      *peer;
1178         kib_conn_t      *conn;
1179         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1180
1181         /* If I get here, I've committed to send, so I complete the tx with
1182          * failure on any problems */
1183         
1184         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1185         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
1186
1187         read_lock_irqsave(g_lock, flags);
1188         
1189         peer = kibnal_find_peer_locked (nid);
1190         if (peer == NULL) {
1191                 read_unlock_irqrestore(g_lock, flags);
1192                 tx->tx_status = -EHOSTUNREACH;
1193                 kibnal_tx_done (tx);
1194                 return;
1195         }
1196
1197         conn = kibnal_find_conn_locked (peer);
1198         if (conn != NULL) {
1199                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1200                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1201                        atomic_read (&conn->ibc_refcount));
1202                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1203                 read_unlock_irqrestore(g_lock, flags);
1204                 
1205                 kibnal_queue_tx (tx, conn);
1206                 return;
1207         }
1208         
1209         /* Making one or more connections; I'll need a write lock... */
1210         read_unlock(g_lock);
1211         write_lock(g_lock);
1212
1213         peer = kibnal_find_peer_locked (nid);
1214         if (peer == NULL) {
1215                 write_unlock_irqrestore (g_lock, flags);
1216                 tx->tx_status = -EHOSTUNREACH;
1217                 kibnal_tx_done (tx);
1218                 return;
1219         }
1220
1221         conn = kibnal_find_conn_locked (peer);
1222         if (conn != NULL) {
1223                 /* Connection exists; queue message on it */
1224                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1225                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1226                        atomic_read (&conn->ibc_refcount));
1227                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1228                 write_unlock_irqrestore (g_lock, flags);
1229                 
1230                 kibnal_queue_tx (tx, conn);
1231                 return;
1232         }
1233
1234         if (peer->ibp_connecting == 0) {
1235                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1236                         write_unlock_irqrestore (g_lock, flags);
1237                         tx->tx_status = -EHOSTUNREACH;
1238                         kibnal_tx_done (tx);
1239                         return;
1240                 }
1241         
1242                 peer->ibp_connecting = 1;
1243
1244                 kib_peer_addref(peer); /* extra ref for connd */
1245         
1246                 spin_lock (&kibnal_data.kib_connd_lock);
1247         
1248                 list_add_tail (&peer->ibp_connd_list,
1249                                &kibnal_data.kib_connd_peers);
1250                 wake_up (&kibnal_data.kib_connd_waitq);
1251         
1252                 spin_unlock (&kibnal_data.kib_connd_lock);
1253         }
1254         
1255         /* A connection is being established; queue the message... */
1256         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1257
1258         write_unlock_irqrestore (g_lock, flags);
1259 }
1260
1261 static ptl_err_t
1262 kibnal_start_passive_rdma (int type, ptl_nid_t nid,
1263                             lib_msg_t *libmsg, ptl_hdr_t *hdr)
1264 {
1265         int         nob = libmsg->md->length;
1266         kib_tx_t   *tx;
1267         kib_msg_t  *ibmsg;
1268         int         rc;
1269         vv_access_con_bit_mask_t access;
1270         
1271         LASSERT (type == IBNAL_MSG_PUT_RDMA || type == IBNAL_MSG_GET_RDMA);
1272         LASSERT (nob > 0);
1273         LASSERT (!in_interrupt());              /* Mapping could block */
1274
1275         access = vv_acc_l_mem_write | vv_acc_r_mem_write | vv_acc_r_mem_read | vv_acc_mem_bind;
1276
1277         tx = kibnal_get_idle_tx (1);           /* May block; caller is an app thread */
1278         LASSERT (tx != NULL);
1279
1280         if ((libmsg->md->options & PTL_MD_KIOV) == 0) 
1281                 rc = kibnal_map_iov (tx, access,
1282                                      libmsg->md->md_niov,
1283                                      libmsg->md->md_iov.iov,
1284                                      0, nob, 0);
1285         else
1286                 rc = kibnal_map_kiov (tx, access,
1287                                       libmsg->md->md_niov, 
1288                                       libmsg->md->md_iov.kiov,
1289                                       0, nob, 0);
1290
1291         if (rc != 0) {
1292                 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1293                 goto failed;
1294         }
1295         
1296         if (type == IBNAL_MSG_GET_RDMA) {
1297                 /* reply gets finalized when tx completes */
1298                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, 
1299                                                         nid, libmsg);
1300                 if (tx->tx_libmsg[1] == NULL) {
1301                         CERROR ("Can't create reply for GET -> "LPX64"\n",
1302                                 nid);
1303                         rc = -ENOMEM;
1304                         goto failed;
1305                 }
1306         }
1307         
1308         tx->tx_passive_rdma = 1;
1309
1310         ibmsg = tx->tx_msg;
1311
1312         ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
1313         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1314         /* map_kiov alrady filled the rdma descs for the whole_mem case */
1315         if (!kibnal_whole_mem()) {
1316                 ibmsg->ibm_u.rdma.rd_key = tx->tx_md.md_rkey;
1317                 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1318                 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1319                 ibmsg->ibm_u.rdma.ibrm_num_descs = 1;
1320         }
1321
1322         kibnal_init_tx_msg (tx, type, 
1323                             kib_rdma_msg_len(ibmsg->ibm_u.rdma.ibrm_num_descs));
1324
1325         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1326                LPX64", nob %d\n",
1327                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1328                tx->tx_md.md_addr, nob);
1329         
1330         /* libmsg gets finalized when tx completes. */
1331         tx->tx_libmsg[0] = libmsg;
1332
1333         kibnal_launch_tx(tx, nid);
1334         return (PTL_OK);
1335
1336  failed:
1337         tx->tx_status = rc;
1338         kibnal_tx_done (tx);
1339         return (PTL_FAIL);
1340 }
1341
1342 void
1343 kibnal_start_active_rdma (int type, int status,
1344                            kib_rx_t *rx, lib_msg_t *libmsg, 
1345                            unsigned int niov,
1346                            struct iovec *iov, ptl_kiov_t *kiov,
1347                            size_t offset, size_t nob)
1348 {
1349         kib_msg_t    *rxmsg = rx->rx_msg;
1350         kib_msg_t    *txmsg;
1351         kib_tx_t     *tx;
1352         vv_access_con_bit_mask_t access;
1353         vv_wr_operation_t rdma_op;
1354         int           rc;
1355         __u32         i;
1356
1357         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1358                type, status, niov, offset, nob);
1359
1360         /* Called by scheduler */
1361         LASSERT (!in_interrupt ());
1362
1363         /* Either all pages or all vaddrs */
1364         LASSERT (!(kiov != NULL && iov != NULL));
1365
1366         /* No data if we're completing with failure */
1367         LASSERT (status == 0 || nob == 0);
1368
1369         LASSERT (type == IBNAL_MSG_GET_DONE ||
1370                  type == IBNAL_MSG_PUT_DONE);
1371
1372         /* Flag I'm completing the RDMA.  Even if I fail to send the
1373          * completion message, I will have tried my best so further
1374          * attempts shouldn't be tried. */
1375         LASSERT (!rx->rx_rdma);
1376         rx->rx_rdma = 1;
1377
1378         if (type == IBNAL_MSG_GET_DONE) {
1379                 access = 0;
1380                 rdma_op  = vv_wr_rdma_write;
1381                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1382         } else {
1383                 access = vv_acc_l_mem_write;
1384                 rdma_op  = vv_wr_rdma_read;
1385                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1386         }
1387
1388         tx = kibnal_get_idle_tx (0);           /* Mustn't block */
1389         if (tx == NULL) {
1390                 CERROR ("tx descs exhausted on RDMA from "LPX64
1391                         " completing locally with failure\n",
1392                         rx->rx_conn->ibc_peer->ibp_nid);
1393                 lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1394                 return;
1395         }
1396         LASSERT (tx->tx_nsp == 0);
1397
1398         if (nob == 0) 
1399                 GOTO(init_tx, 0);
1400
1401         /* We actually need to transfer some data (the transfer
1402          * size could get truncated to zero when the incoming
1403          * message is matched) */
1404         if (kiov != NULL)
1405                 rc = kibnal_map_kiov (tx, access, niov, kiov, offset, nob, 1);
1406         else
1407                 rc = kibnal_map_iov (tx, access, niov, iov, offset, nob, 1);
1408         
1409         if (rc != 0) {
1410                 CERROR ("Can't map RDMA -> "LPX64": %d\n", 
1411                         rx->rx_conn->ibc_peer->ibp_nid, rc);
1412                 /* We'll skip the RDMA and complete with failure. */
1413                 status = rc;
1414                 nob = 0;
1415                 GOTO(init_tx, rc);
1416         } 
1417
1418         if (!kibnal_whole_mem()) {
1419                 tx->tx_msg->ibm_u.rdma.rd_key = tx->tx_md.md_lkey;
1420                 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1421                 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1422                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 1;
1423         }
1424
1425         /* XXX ugh.  different page-sized hosts. */ 
1426         if (tx->tx_msg->ibm_u.rdma.ibrm_num_descs !=
1427             rxmsg->ibm_u.rdma.ibrm_num_descs) {
1428                 CERROR("tx descs (%u) != rx descs (%u)\n", 
1429                        tx->tx_msg->ibm_u.rdma.ibrm_num_descs,
1430                        rxmsg->ibm_u.rdma.ibrm_num_descs);
1431                 /* We'll skip the RDMA and complete with failure. */
1432                 status = rc;
1433                 nob = 0;
1434                 GOTO(init_tx, rc);
1435         }
1436
1437         /* map_kiov filled in the rdma descs which describe our side of the
1438          * rdma transfer. */
1439         /* ibrm_num_descs was verified in rx_callback */
1440         for(i = 0; i < rxmsg->ibm_u.rdma.ibrm_num_descs; i++) {
1441                 kib_rdma_desc_t *ldesc, *rdesc; /* local, remote */
1442                 vv_scatgat_t *ds = &tx->tx_gl[i];
1443                 vv_wr_t *wrq = &tx->tx_wrq[i];
1444
1445                 ldesc = &tx->tx_msg->ibm_u.rdma.ibrm_desc[i];
1446                 rdesc = &rxmsg->ibm_u.rdma.ibrm_desc[i];
1447
1448                 ds->v_address = (void *)(unsigned long)ldesc->rd_addr;
1449                 ds->length    = ldesc->rd_nob;
1450                 ds->l_key     = tx->tx_msg->ibm_u.rdma.rd_key;
1451
1452                 wrq->wr_id = kibnal_ptr2wreqid(tx, 0);
1453
1454 #if 0
1455                 /* only the last rdma post triggers tx completion */
1456                 if (i == rxmsg->ibm_u.rdma.ibrm_num_descs - 1)
1457                         wrq->completion_notification = 1;
1458                 else
1459                         wrq->completion_notification = 0;
1460
1461 #else
1462                 /* TODO: hack. Right now complete everything, else the
1463                  * driver will deadlock. This is less efficient than
1464                  * requestion a notification for only a few of the
1465                  * WQE. */
1466                 wrq->completion_notification = 1;
1467 #endif
1468
1469                 wrq->scatgat_list = ds;
1470                 wrq->num_of_data_segments = 1;
1471                 wrq->wr_type = rdma_op;
1472
1473                 wrq->type.send.solicited_event = 0;
1474
1475                 wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1476                 wrq->type.send.send_qp_type.rc_type.r_addr = rdesc->rd_addr;
1477                 wrq->type.send.send_qp_type.rc_type.r_r_key = rxmsg->ibm_u.rdma.rd_key;
1478
1479                 CDEBUG(D_NET, "prepared RDMA with r_addr=%llx r_key=%x\n",
1480                        wrq->type.send.send_qp_type.rc_type.r_addr,
1481                        wrq->type.send.send_qp_type.rc_type.r_r_key);
1482
1483                 tx->tx_nsp++;
1484         }
1485
1486 init_tx:
1487         txmsg = tx->tx_msg;
1488
1489         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1490         txmsg->ibm_u.completion.ibcm_status = status;
1491         
1492         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1493
1494         if (status == 0 && nob != 0) {
1495                 LASSERT (tx->tx_nsp > 1);
1496                 /* RDMA: libmsg gets finalized when the tx completes.  This
1497                  * is after the completion message has been sent, which in
1498                  * turn is after the RDMA has finished. */
1499                 tx->tx_libmsg[0] = libmsg;
1500         } else {
1501                 LASSERT (tx->tx_nsp == 1);
1502                 /* No RDMA: local completion happens now! */
1503                 CDEBUG(D_WARNING,"No data: immediate completion\n");
1504                 lib_finalize (&kibnal_lib, NULL, libmsg,
1505                               status == 0 ? PTL_OK : PTL_FAIL);
1506         }
1507
1508         /* +1 ref for this tx... */
1509         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1510                rx->rx_conn, rx->rx_conn->ibc_state, 
1511                rx->rx_conn->ibc_peer->ibp_nid,
1512                atomic_read (&rx->rx_conn->ibc_refcount));
1513         atomic_inc (&rx->rx_conn->ibc_refcount);
1514         /* ...and queue it up */
1515         kibnal_queue_tx(tx, rx->rx_conn);
1516 }
1517
1518 static ptl_err_t
1519 kibnal_sendmsg(lib_nal_t    *nal, 
1520                 void         *private,
1521                 lib_msg_t    *libmsg,
1522                 ptl_hdr_t    *hdr, 
1523                 int           type, 
1524                 ptl_nid_t     nid, 
1525                 ptl_pid_t     pid,
1526                 unsigned int  payload_niov, 
1527                 struct iovec *payload_iov, 
1528                 ptl_kiov_t   *payload_kiov,
1529                 size_t        payload_offset,
1530                 size_t        payload_nob)
1531 {
1532         kib_msg_t  *ibmsg;
1533         kib_tx_t   *tx;
1534         int         nob;
1535
1536         /* NB 'private' is different depending on what we're sending.... */
1537
1538         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1539                " pid %d\n", payload_nob, payload_niov, nid , pid);
1540
1541         LASSERT (payload_nob == 0 || payload_niov > 0);
1542         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1543
1544         /* Thread context if we're sending payload */
1545         LASSERT (!in_interrupt() || payload_niov == 0);
1546         /* payload is either all vaddrs or all pages */
1547         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1548
1549         switch (type) {
1550         default:
1551                 LBUG();
1552                 return (PTL_FAIL);
1553                 
1554         case PTL_MSG_REPLY: {
1555                 /* reply's 'private' is the incoming receive */
1556                 kib_rx_t *rx = private;
1557
1558                 /* RDMA reply expected? */
1559                 if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
1560                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1561                                                  rx, libmsg, payload_niov, 
1562                                                  payload_iov, payload_kiov,
1563                                                  payload_offset, payload_nob);
1564                         return (PTL_OK);
1565                 }
1566                 
1567                 /* Incoming message consistent with immediate reply? */
1568                 if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
1569                         CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1570                                 nid, rx->rx_msg->ibm_type);
1571                         return (PTL_FAIL);
1572                 }
1573
1574                 /* Will it fit in a message? */
1575                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1576                 if (nob > IBNAL_MSG_SIZE) {
1577                         CERROR("REPLY for "LPX64" too big (RDMA not requested): %d (max for message is %d)\n", 
1578                                nid, payload_nob, IBNAL_MSG_SIZE);
1579                         return (PTL_FAIL);
1580                 }
1581                 break;
1582         }
1583
1584         case PTL_MSG_GET:
1585                 /* might the REPLY message be big enough to need RDMA? */
1586                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1587                 if (nob > IBNAL_MSG_SIZE)
1588                         return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, 
1589                                                           nid, libmsg, hdr));
1590                 break;
1591
1592         case PTL_MSG_ACK:
1593                 LASSERT (payload_nob == 0);
1594                 break;
1595
1596         case PTL_MSG_PUT:
1597                 /* Is the payload big enough to need RDMA? */
1598                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1599                 if (nob > IBNAL_MSG_SIZE)
1600                         return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
1601                                                           nid, libmsg, hdr));
1602                 
1603                 break;
1604         }
1605
1606         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1607                                   type == PTL_MSG_REPLY ||
1608                                   in_interrupt()));
1609         if (tx == NULL) {
1610                 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", 
1611                         type, nid, in_interrupt() ? " (intr)" : "");
1612                 return (PTL_NO_SPACE);
1613         }
1614
1615         ibmsg = tx->tx_msg;
1616         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1617
1618         if (payload_nob > 0) {
1619                 if (payload_kiov != NULL)
1620                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1621                                           payload_niov, payload_kiov,
1622                                           payload_offset, payload_nob);
1623                 else
1624                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1625                                          payload_niov, payload_iov,
1626                                          payload_offset, payload_nob);
1627         }
1628
1629         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1630                             offsetof(kib_immediate_msg_t, 
1631                                      ibim_payload[payload_nob]));
1632
1633         /* libmsg gets finalized when tx completes */
1634         tx->tx_libmsg[0] = libmsg;
1635
1636         kibnal_launch_tx(tx, nid);
1637         return (PTL_OK);
1638 }
1639
1640 static ptl_err_t
1641 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1642                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1643                unsigned int payload_niov, struct iovec *payload_iov,
1644                size_t payload_offset, size_t payload_len)
1645 {
1646         CDEBUG(D_NET, "  pid = %d, nid="LPU64"\n",
1647                pid, nid);
1648         return (kibnal_sendmsg(nal, private, cookie,
1649                                hdr, type, nid, pid,
1650                                payload_niov, payload_iov, NULL,
1651                                payload_offset, payload_len));
1652 }
1653
1654 static ptl_err_t
1655 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1656                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1657                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1658                      size_t payload_offset, size_t payload_len)
1659 {
1660         return (kibnal_sendmsg(nal, private, cookie,
1661                                hdr, type, nid, pid,
1662                                payload_niov, NULL, payload_kiov,
1663                                payload_offset, payload_len));
1664 }
1665
1666 static ptl_err_t
1667 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1668                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1669                  size_t offset, size_t mlen, size_t rlen)
1670 {
1671         kib_rx_t    *rx = private;
1672         kib_msg_t   *rxmsg = rx->rx_msg;
1673         int          msg_nob;
1674         
1675         LASSERT (mlen <= rlen);
1676         LASSERT (!in_interrupt ());
1677         /* Either all pages or all vaddrs */
1678         LASSERT (!(kiov != NULL && iov != NULL));
1679
1680         switch (rxmsg->ibm_type) {
1681         default:
1682                 LBUG();
1683                 return (PTL_FAIL);
1684                 
1685         case IBNAL_MSG_IMMEDIATE:
1686                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1687                 if (msg_nob > IBNAL_MSG_SIZE) {
1688                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1689                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1690                         return (PTL_FAIL);
1691                 }
1692
1693                 if (kiov != NULL)
1694                         lib_copy_buf2kiov(niov, kiov, offset,
1695                                           rxmsg->ibm_u.immediate.ibim_payload,
1696                                           mlen);
1697                 else
1698                         lib_copy_buf2iov(niov, iov, offset,
1699                                          rxmsg->ibm_u.immediate.ibim_payload,
1700                                          mlen);
1701
1702                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1703                 return (PTL_OK);
1704
1705         case IBNAL_MSG_GET_RDMA:
1706                 /* We get called here just to discard any junk after the
1707                  * GET hdr. */
1708                 LASSERT (libmsg == NULL);
1709                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1710                 return (PTL_OK);
1711
1712         case IBNAL_MSG_PUT_RDMA:
1713                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
1714                                           rx, libmsg, 
1715                                           niov, iov, kiov, offset, mlen);
1716                 return (PTL_OK);
1717         }
1718 }
1719
1720 static ptl_err_t
1721 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1722               unsigned int niov, struct iovec *iov, 
1723               size_t offset, size_t mlen, size_t rlen)
1724 {
1725         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1726                                 offset, mlen, rlen));
1727 }
1728
1729 static ptl_err_t
1730 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1731                      unsigned int niov, ptl_kiov_t *kiov, 
1732                      size_t offset, size_t mlen, size_t rlen)
1733 {
1734         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1735                                 offset, mlen, rlen));
1736 }
1737
1738 /*****************************************************************************
1739  * the rest of this file concerns connection management.  active connetions
1740  * start with connect_peer, passive connections start with passive_callback.
1741  * active disconnects start with conn_close, cm_callback starts passive
1742  * disconnects and contains the guts of how the disconnect state machine
1743  * progresses. 
1744  *****************************************************************************/
1745
1746 int
1747 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1748 {
1749         long    pid = kernel_thread (fn, arg, 0);
1750
1751         if (pid < 0)
1752                 return ((int)pid);
1753
1754         atomic_inc (&kibnal_data.kib_nthreads);
1755         return (0);
1756 }
1757
1758 static void
1759 kibnal_thread_fini (void)
1760 {
1761         atomic_dec (&kibnal_data.kib_nthreads);
1762 }
1763
1764 /* this can be called by anyone at any time to close a connection.  if
1765  * the connection is still established it heads to the connd to start
1766  * the disconnection in a safe context.  It has no effect if called
1767  * on a connection that is already disconnecting */
1768 void
1769 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1770 {
1771         /* This just does the immmediate housekeeping, and schedules the
1772          * connection for the connd to finish off.
1773          * Caller holds kib_global_lock exclusively in irq context */
1774         kib_peer_t   *peer = conn->ibc_peer;
1775
1776         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_CONNECTING,
1777                                     IBNAL_CONN_DISCONNECTED);
1778
1779         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
1780                 return; /* already disconnecting */
1781
1782         CDEBUG (error == 0 ? D_NET : D_ERROR,
1783                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1784
1785         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1786                 /* kib_connd_conns takes ibc_list's ref */
1787                 list_del (&conn->ibc_list);
1788         } else {
1789                 /* new ref for kib_connd_conns */
1790                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1791                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1792                        atomic_read (&conn->ibc_refcount));
1793                 atomic_inc (&conn->ibc_refcount);
1794         }
1795         
1796         if (list_empty (&peer->ibp_conns) &&
1797             peer->ibp_persistence == 0) {
1798                 /* Non-persistent peer with no more conns... */
1799                 kibnal_unlink_peer_locked (peer);
1800         }
1801
1802         conn->ibc_state = IBNAL_CONN_SEND_DREQ;
1803
1804         spin_lock (&kibnal_data.kib_connd_lock);
1805
1806         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1807         wake_up (&kibnal_data.kib_connd_waitq);
1808                 
1809         spin_unlock (&kibnal_data.kib_connd_lock);
1810 }
1811
1812 void
1813 kibnal_close_conn (kib_conn_t *conn, int error)
1814 {
1815         unsigned long     flags;
1816
1817         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1818
1819         kibnal_close_conn_locked (conn, error);
1820         
1821         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1822 }
1823
1824 static void
1825 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
1826 {
1827         LIST_HEAD        (zombies);
1828         kib_tx_t         *tx;
1829         unsigned long     flags;
1830
1831         LASSERT (rc != 0);
1832         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1833
1834         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1835
1836         LASSERT (peer->ibp_connecting != 0);
1837         peer->ibp_connecting--;
1838         if (peer->ibp_connecting != 0) {
1839                 /* another connection attempt under way (loopback?)... */
1840                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1841                 return;
1842         }
1843
1844         if (list_empty(&peer->ibp_conns)) {
1845                 /* Say when active connection can be re-attempted */
1846                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1847                 /* Increase reconnection interval */
1848                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1849                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1850         
1851                 /* Take peer's blocked blocked transmits; I'll complete
1852                  * them with error */
1853                 while (!list_empty (&peer->ibp_tx_queue)) {
1854                         tx = list_entry (peer->ibp_tx_queue.next,
1855                                          kib_tx_t, tx_list);
1856                         
1857                         list_del (&tx->tx_list);
1858                         list_add_tail (&tx->tx_list, &zombies);
1859                 }
1860                 
1861                 if (kibnal_peer_active(peer) &&
1862                     (peer->ibp_persistence == 0)) {
1863                         /* failed connection attempt on non-persistent peer */
1864                         kibnal_unlink_peer_locked (peer);
1865                 }
1866         } else {
1867                 /* Can't have blocked transmits if there are connections */
1868                 LASSERT (list_empty(&peer->ibp_tx_queue));
1869         }
1870         
1871         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1872
1873         if (!list_empty (&zombies))
1874                 CERROR ("Deleting messages for "LPX64": connection failed\n",
1875                         peer->ibp_nid);
1876
1877         while (!list_empty (&zombies)) {
1878                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1879
1880                 list_del (&tx->tx_list);
1881                 /* complete now */
1882                 tx->tx_status = -EHOSTUNREACH;
1883                 kibnal_tx_done (tx);
1884         }
1885 }
1886
1887 static void
1888 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1889 {
1890         int               state = conn->ibc_state;
1891         kib_peer_t       *peer = conn->ibc_peer;
1892         kib_tx_t         *tx;
1893         unsigned long     flags;
1894         int               i;
1895
1896         CDEBUG(D_NET, "Enter kibnal_connreq_done for conn=%p, active=%d, status=%d\n",
1897                conn, active, status);
1898
1899         /* passive connection has no connreq & vice versa */
1900         LASSERTF(!active == !(conn->ibc_connreq != NULL),
1901                  "%d %p\n", active, conn->ibc_connreq);
1902
1903         if (active) {
1904                 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1905                 conn->ibc_connreq = NULL;
1906         }
1907
1908         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1909
1910         LASSERT (peer->ibp_connecting != 0);
1911         
1912         if (status == 0) {                         
1913                 /* connection established... */
1914                 KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_CONNECTING);
1915                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1916
1917                 if (!kibnal_peer_active(peer)) {
1918                         /* ...but peer deleted meantime */
1919                         status = -ECONNABORTED;
1920                 }
1921         } else {
1922                 KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_INIT_QP,
1923                                             IBNAL_CONN_CONNECTING);
1924         }
1925
1926         if (status == 0) {
1927                 /* Everything worked! */
1928
1929                 peer->ibp_connecting--;
1930
1931                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1932                  * the IB_CM_IDLE callback */
1933                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1934                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1935                        atomic_read (&conn->ibc_refcount));
1936                 atomic_inc (&conn->ibc_refcount);
1937                 list_add (&conn->ibc_list, &peer->ibp_conns);
1938                 
1939                 /* reset reconnect interval for next attempt */
1940                 peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1941
1942                 /* post blocked sends to the new connection */
1943                 spin_lock (&conn->ibc_lock);
1944                 
1945                 while (!list_empty (&peer->ibp_tx_queue)) {
1946                         tx = list_entry (peer->ibp_tx_queue.next, 
1947                                          kib_tx_t, tx_list);
1948                         
1949                         list_del (&tx->tx_list);
1950
1951                         /* +1 ref for each tx */
1952                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1953                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1954                                atomic_read (&conn->ibc_refcount));
1955                         atomic_inc (&conn->ibc_refcount);
1956                         kibnal_queue_tx_locked (tx, conn);
1957                 }
1958                 
1959                 spin_unlock (&conn->ibc_lock);
1960
1961                 /* Nuke any dangling conns from a different peer instance... */
1962                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1963                                                  conn->ibc_incarnation);
1964
1965                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1966
1967                 /* queue up all the receives */
1968                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1969                         /* +1 ref for rx desc */
1970                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1971                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1972                                atomic_read (&conn->ibc_refcount));
1973                         atomic_inc (&conn->ibc_refcount);
1974
1975                         CDEBUG(D_NET, "RX[%d] %p->%p\n",
1976                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg);
1977
1978                         kibnal_post_rx (&conn->ibc_rxs[i], 0);
1979                 }
1980
1981                 kibnal_check_sends (conn);
1982                 return;
1983         }
1984
1985         /* connection failed */
1986         if (state == IBNAL_CONN_CONNECTING) {
1987                 /* schedule for connd to close */
1988                 kibnal_close_conn_locked (conn, status);
1989         } else {
1990                 /* Don't have a CM comm_id; just wait for refs to drain */
1991                 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
1992         } 
1993
1994         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1995
1996         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1997
1998         /* If we didn't establish the connection we don't have to pass
1999          * through the disconnect protocol before dropping the CM ref */
2000         if (state < IBNAL_CONN_CONNECTING) 
2001                 kibnal_put_conn (conn);
2002 }
2003
2004 static int
2005 kibnal_accept (kib_conn_t **connp, cm_cep_handle_t *cep,
2006                 ptl_nid_t nid, __u64 incarnation, int queue_depth)
2007 {
2008         kib_conn_t    *conn = kibnal_create_conn();
2009         kib_peer_t    *peer;
2010         kib_peer_t    *peer2;
2011         unsigned long  flags;
2012
2013         if (conn == NULL)
2014                 return (-ENOMEM);
2015
2016         if (queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2017                 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
2018                        nid, queue_depth, IBNAL_MSG_QUEUE_SIZE);
2019                 atomic_dec (&conn->ibc_refcount);
2020                 kibnal_destroy_conn(conn);
2021                 return (-EPROTO);
2022         }
2023         
2024         /* assume 'nid' is a new peer */
2025         peer = kibnal_create_peer (nid);
2026         if (peer == NULL) {
2027                 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
2028                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
2029                        atomic_read (&conn->ibc_refcount));
2030                 atomic_dec (&conn->ibc_refcount);
2031                 kibnal_destroy_conn(conn);
2032                 return (-ENOMEM);
2033         }
2034         
2035         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
2036
2037         peer2 = kibnal_find_peer_locked(nid);
2038         if (peer2 == NULL) {
2039                 /* peer table takes my ref on peer */
2040                 list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(nid));
2041         } else {
2042                 kib_peer_decref (peer);
2043                 peer = peer2;
2044         }
2045
2046         kib_peer_addref(peer); /* +1 ref for conn */
2047         peer->ibp_connecting++;
2048
2049         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2050
2051         conn->ibc_peer = peer;
2052         conn->ibc_state = IBNAL_CONN_CONNECTING;
2053         /* conn->ibc_cep is set when cm_accept is called */
2054         conn->ibc_incarnation = incarnation;
2055         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2056
2057         *connp = conn;
2058         return (0);
2059 }
2060
2061 static void kibnal_move_qp_to_error(kib_conn_t *conn)
2062 {
2063         vv_qp_attr_t qp_attr;
2064         vv_return_t retval;
2065
2066         qp_attr.modify.qp_modify_into_state = vv_qp_state_error;
2067         qp_attr.modify.vv_qp_attr_mask      = VV_QP_AT_STATE;
2068         qp_attr.modify.qp_type              = vv_qp_type_r_conn;
2069
2070         retval = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &qp_attr, &conn->ibc_qp_attrs);
2071         if (retval)
2072                 CERROR("couldn't move qp into error state, error %d\n", retval);
2073 }
2074
2075 static void kibnal_flush_pending(kib_conn_t *conn)
2076 {
2077         LIST_HEAD        (zombies); 
2078         struct list_head *tmp;
2079         struct list_head *nxt;
2080         kib_tx_t         *tx;
2081         unsigned long     flags;
2082         int               done;
2083
2084         /* NB we wait until the connection has closed before completing
2085          * outstanding passive RDMAs so we can be sure the network can't 
2086          * touch the mapped memory any more. */
2087         KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_DISCONNECTED);
2088
2089         /* set the QP to the error state so that we get flush callbacks
2090          * on our posted receives which can then drop their conn refs */
2091         kibnal_move_qp_to_error(conn);
2092
2093         spin_lock_irqsave (&conn->ibc_lock, flags);
2094
2095         /* grab passive RDMAs not waiting for the tx callback */
2096         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
2097                 tx = list_entry (tmp, kib_tx_t, tx_list);
2098
2099                 LASSERT (tx->tx_passive_rdma ||
2100                          !tx->tx_passive_rdma_wait);
2101
2102                 LASSERT (tx->tx_passive_rdma_wait ||
2103                          tx->tx_sending != 0);
2104
2105                 /* still waiting for tx callback? */
2106                 if (!tx->tx_passive_rdma_wait)
2107                         continue;
2108
2109                 tx->tx_status = -ECONNABORTED;
2110                 tx->tx_passive_rdma_wait = 0;
2111                 done = (tx->tx_sending == 0);
2112
2113                 if (!done)
2114                         continue;
2115
2116                 list_del (&tx->tx_list);
2117                 list_add (&tx->tx_list, &zombies);
2118         }
2119
2120         /* grab all blocked transmits */
2121         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
2122                 tx = list_entry (tmp, kib_tx_t, tx_list);
2123                 
2124                 list_del (&tx->tx_list);
2125                 list_add (&tx->tx_list, &zombies);
2126         }
2127         
2128         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2129
2130         while (!list_empty(&zombies)) {
2131                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
2132
2133                 list_del(&tx->tx_list);
2134                 kibnal_tx_done (tx);
2135         }
2136 }
2137
2138 static void
2139 kibnal_reject (cm_cep_handle_t cep, cm_rej_code_t reason)
2140 {
2141         cm_reject_data_t *rej;
2142
2143         PORTAL_ALLOC(rej, sizeof(*rej));
2144         if (rej == NULL) /* PORTAL_ALLOC() will CERROR on failure */
2145                 return;  
2146
2147         rej->reason = reason;
2148         cm_reject(cep, rej);
2149         PORTAL_FREE(rej, sizeof(*rej));
2150 }
2151
2152 static void get_av_from_path(ib_path_record_v2_t *path, vv_add_vec_t *av)
2153 {
2154         av->service_level = path->sl;
2155         av->grh_flag = 0;       /* TODO: correct? */
2156         av->dlid = path->dlid;
2157         av->pmtu = path->mtu;
2158
2159         /* From sdp-hca-params.h. */
2160         switch(path->rate) {
2161         case 2:
2162                 av->max_static_rate = 1;
2163                 break;
2164         case 3:
2165         case 4:
2166         default:
2167                 av->max_static_rate = 0;
2168                 break;
2169         }
2170
2171         av->l_ack_timeout = IBNAL_ACK_TIMEOUT;
2172         av->retry_count = IBNAL_RETRY;
2173         av->rnr_retry_count = IBNAL_RNR_RETRY; 
2174         av->source_path_bit = 0;
2175
2176         av->global_dest.flow_lable = path->flow_label;
2177         av->global_dest.hope_limit = path->hop_limut;
2178         av->global_dest.traffic_class = path->traffic_class;
2179         av->global_dest.s_gid_index = 0;
2180         av->global_dest.d_gid = path->dgid;
2181 };
2182
2183 static vv_return_t
2184 kibnal_qp_rts(vv_qp_h_t qp_handle, __u32 qpn, __u8 resp_res, 
2185               ib_path_record_v2_t *path, __u8 init_depth, __u32 send_psn)
2186 {
2187         vv_qp_attr_t qp_attr;
2188         vv_return_t retval;
2189
2190         ENTRY;
2191
2192 #if 1
2193         /* TODO - Hack. I don't know whether I get bad values from the
2194          * stack or if I'm using the wrong names. */
2195         resp_res = 8;
2196         init_depth = 8;
2197 #endif
2198
2199         /* RTR */
2200         qp_attr.modify.qp_modify_into_state = vv_qp_state_rtr;
2201         qp_attr.modify.vv_qp_attr_mask =
2202                 VV_QP_AT_STATE | 
2203                 VV_QP_AT_ADD_VEC |
2204                 VV_QP_AT_DEST_QP |
2205                 VV_QP_AT_R_PSN |
2206                 VV_QP_AT_RESP_RDMA_ATOM_OUT_NUM |
2207                 VV_QP_AT_MIN_RNR_NAK_T | VV_QP_AT_OP_F;
2208
2209         qp_attr.modify.qp_type = vv_qp_type_r_conn;
2210
2211         get_av_from_path(path, &qp_attr.modify.params.rtr.remote_add_vec);
2212         qp_attr.modify.params.rtr.destanation_qp = qpn;
2213         qp_attr.modify.params.rtr.receive_psn = IBNAL_STARTING_PSN;
2214         qp_attr.modify.params.rtr.responder_rdma_r_atom_num = resp_res;
2215         qp_attr.modify.params.rtr.opt_min_rnr_nak_timer = 16; /* 20 ms */
2216
2217         /* For now, force MTU to 1KB (Voltaire's advice). */
2218         qp_attr.modify.params.rtr.remote_add_vec.pmtu = vv_mtu_1024;
2219
2220         retval = vv_qp_modify(kibnal_data.kib_hca, qp_handle, &qp_attr, NULL);
2221         if (retval) {
2222                 CERROR("Cannot modify QP to RTR: %d\n", retval);
2223                 RETURN(retval);
2224         }
2225
2226         /* RTS */
2227         qp_attr.modify.qp_modify_into_state = vv_qp_state_rts;
2228         qp_attr.modify.vv_qp_attr_mask = 
2229                 VV_QP_AT_STATE |
2230                 VV_QP_AT_L_ACK_T |
2231                 VV_QP_AT_RETRY_NUM |
2232                 VV_QP_AT_RNR_NUM |
2233                 VV_QP_AT_S_PSN |
2234                 VV_QP_AT_DEST_RDMA_ATOM_OUT_NUM;
2235         qp_attr.modify.qp_type = vv_qp_type_r_conn;             
2236
2237         qp_attr.modify.params.rts.local_ack_timeout = path->pkt_life_time + 2; /* 2 or 1? */ 
2238         qp_attr.modify.params.rts.retry_num = IBNAL_RETRY;
2239         qp_attr.modify.params.rts.rnr_num = IBNAL_RNR_RETRY;
2240         qp_attr.modify.params.rts.send_psn = send_psn;
2241         qp_attr.modify.params.rts.dest_out_rdma_r_atom_num = init_depth;
2242         qp_attr.modify.params.rts.flow_control = 1; /* Stack does not use it. */
2243
2244         retval = vv_qp_modify(kibnal_data.kib_hca, qp_handle, &qp_attr, NULL);
2245         if (retval) {
2246                 CERROR("Cannot modify QP to RTS: %d\n", retval);
2247         }
2248
2249         RETURN(retval);
2250 }
2251
2252 static void
2253 kibnal_connect_reply (cm_cep_handle_t cep, cm_conn_data_t *info, kib_conn_t *conn)
2254 {
2255         vv_hca_attrib_t *ca_attr = &kibnal_data.kib_hca_attrs;
2256         kib_wire_connreq_t *wcr;
2257         cm_reply_data_t *rep = &info->data.reply;
2258         cm_rej_code_t reason;
2259         vv_return_t retval;
2260
2261         wcr = (kib_wire_connreq_t *)info->data.reply.priv_data;
2262
2263         if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2264                 CERROR ("Can't connect "LPX64": bad magic %08x\n",
2265                         conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic));
2266                 GOTO(reject, reason = cm_rej_code_usr_rej);
2267         }
2268         
2269         if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2270                 CERROR ("Can't connect "LPX64": bad version %d\n",
2271                         conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic));
2272                 GOTO(reject, reason = cm_rej_code_usr_rej);
2273         }
2274                         
2275         if (wcr->wcr_queue_depth != cpu_to_le16(IBNAL_MSG_QUEUE_SIZE)) {
2276                 CERROR ("Can't connect "LPX64": bad queue depth %d\n",
2277                         conn->ibc_peer->ibp_nid, 
2278                         le16_to_cpu(wcr->wcr_queue_depth));
2279                 GOTO(reject, reason = cm_rej_code_usr_rej);
2280         }
2281                         
2282         if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) {
2283                 CERROR ("Unexpected NID "LPX64" from "LPX64"\n",
2284                         le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid);
2285                 GOTO(reject, reason = cm_rej_code_usr_rej);
2286         }
2287
2288         CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2289                conn, conn->ibc_peer->ibp_nid);
2290
2291         conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation);
2292         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2293
2294         retval = kibnal_qp_rts(conn->ibc_qp, rep->qpn, 
2295                             min_t(__u8, rep->arb_initiator_depth,
2296                                   ca_attr->max_read_atom_qp_outstanding),
2297                             &conn->ibc_connreq->cr_path, 
2298                             min_t(__u8, rep->arb_resp_res,
2299                                   ca_attr->max_qp_depth_for_init_read_atom),
2300                             rep->start_psn);
2301
2302         if (retval) {
2303                 CERROR("Connection %p -> "LPX64" QP RTS/RTR failed: %d\n",
2304                        conn, conn->ibc_peer->ibp_nid, retval);
2305                 GOTO(reject, reason = cm_rej_code_no_qp);
2306         }
2307
2308         dump_qp(conn);
2309
2310         /* the callback arguments are ignored for an active accept */
2311         /* TODO: memset cmrtu? */
2312         retval = cm_accept(cep, NULL, &conn->ibc_connreq->cr_cm_rtu, kibnal_cm_callback, conn);
2313         if (retval) {
2314                 CERROR("Connection %p -> "LPX64" CMAccept RTU failed: %d\n",
2315                        conn, conn->ibc_peer->ibp_nid, retval);
2316                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2317                 /* XXX don't call reject after accept fails? */
2318                 return;
2319         }
2320
2321         CDEBUG(D_NET, "Connection %p -> "LPX64" Established\n",
2322                conn, conn->ibc_peer->ibp_nid);
2323
2324         kibnal_connreq_done (conn, 1, 0);
2325
2326         return;
2327
2328 reject:
2329         kibnal_reject(cep, reason);
2330         kibnal_connreq_done (conn, 1, -EPROTO);
2331 }
2332
2333 /* Off level CM callback */
2334 static void
2335 _kibnal_cm_callback(void * arg)
2336 {
2337         struct cm_off_level *cm_tq = arg;
2338         cm_cep_handle_t cep = cm_tq->cep;
2339         cm_conn_data_t *info = cm_tq->info;
2340         kib_conn_t *conn = cm_tq->conn;
2341         vv_return_t retval;
2342
2343         CDEBUG(D_NET, "CM event 0x%x for CEP %p\n", info->status, cep);
2344
2345         PORTAL_FREE(cm_tq, sizeof(*cm_tq));
2346
2347         /* Established Connection Notifier */
2348         switch (info->status) {
2349         case cm_event_connected:
2350                 CDEBUG(D_NET, "Connection %p -> "LPX64" Established\n",
2351                        conn, conn->ibc_peer->ibp_nid);
2352                 kibnal_connreq_done (conn, 0, 0);
2353                 break;
2354
2355         case cm_event_conn_timeout:
2356         case cm_event_conn_reject:
2357                 /* TODO: be sure this is called only if REQ times out. */
2358                 CERROR("connection timed out\n");
2359                 LASSERT(conn->ibc_state == IBNAL_CONN_CONNECTING);
2360                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2361                 kibnal_connreq_done (conn, 1, -EINVAL);
2362                 break;
2363
2364         case cm_event_conn_reply:
2365                 kibnal_connect_reply(cep, info, conn);
2366                 break;
2367
2368         case cm_event_disconn_request:
2369                 /* XXX lock around these state management bits? */
2370                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
2371                         kibnal_close_conn (conn, 0);
2372                 conn->ibc_state = IBNAL_CONN_DREP;
2373                 
2374                 retval = cm_disconnect(conn->ibc_cep, NULL, &kibnal_data.cm_data.drep_data);
2375                 if (retval)
2376                         CERROR("disconnect rep failed: %d\n", retval);
2377
2378                 /* Fall through ... */
2379
2380         /* these both guarantee that no more cm callbacks will occur */
2381         case cm_event_disconnected: /* aka cm_event_disconn_timeout */
2382         case cm_event_disconn_reply:
2383                 CDEBUG(D_NET, "Connection %p -> "LPX64" disconnect done.\n",
2384                        conn, conn->ibc_peer->ibp_nid);
2385
2386                 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
2387                 kibnal_flush_pending(conn);
2388                 kibnal_put_conn(conn);        /* Lose CM's ref */
2389                 break;
2390
2391         default:
2392                 CERROR("unknown status %d on Connection %p -> "LPX64"\n",
2393                        info->status, conn, conn->ibc_peer->ibp_nid);
2394                 LBUG();
2395                 break;
2396         }
2397
2398         return;
2399 }
2400
2401 static void
2402 kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *info, void *arg)
2403 {
2404         struct cm_off_level *cm_tq;
2405
2406         LASSERT(cep);
2407         LASSERT(info);
2408
2409         CDEBUG(D_NET, "CM event 0x%x for CEP %p\n", info->status, cep);
2410
2411         PORTAL_ALLOC_ATOMIC(cm_tq, sizeof(*cm_tq));
2412         if (cm_tq == NULL) {
2413                 CERROR("Failed to allocate a CM off level structure\n");
2414                 return;
2415         }
2416
2417         cm_tq->tq.sync = 0;
2418         cm_tq->tq.routine = _kibnal_cm_callback;
2419         cm_tq->tq.data = cm_tq;
2420
2421         cm_tq->cep = cep;
2422         cm_tq->info = info;
2423         cm_tq->conn = (kib_conn_t *)arg;
2424
2425         schedule_task(&cm_tq->tq);
2426 }
2427
2428 static int
2429 kibnal_set_cm_flags(cm_cep_handle_t cep)
2430 {
2431 #ifdef TODO
2432 voltaire cm doesnot appear to have that functionnality
2433         FSTATUS frc;
2434         uint32 value = 1;
2435
2436         frc = iibt_cm_modify_cep(cep, CM_FLAG_TIMEWAIT_CALLBACK,
2437                                  (char *)&value, sizeof(value), 0);
2438         if (frc != FSUCCESS) {
2439                 CERROR("error setting timeout callback: %d\n", frc);
2440                 return -1;
2441         }
2442
2443 #if 0
2444         frc = iibt_cm_modify_cep(cep, CM_FLAG_ASYNC_ACCEPT, (char *)&value,
2445                                  sizeof(value), 0);
2446         if (frc != FSUCCESS) {
2447                 CERROR("error setting async accept: %d\n", frc);
2448                 return -1;
2449         }
2450 #endif
2451 #endif
2452
2453         return 0;
2454 }
2455
2456 /* Off level listen callback */
2457 static void
2458 _kibnal_listen_callback(void *arg)
2459 {
2460         struct cm_off_level *cm_tq = arg;
2461         cm_cep_handle_t cep = cm_tq->cep;
2462         cm_conn_data_t *info = cm_tq->info;
2463         vv_hca_attrib_t *ca_attr = &kibnal_data.kib_hca_attrs;
2464         cm_request_data_t  *req;
2465         cm_reply_data_t    *rep = NULL;
2466         kib_wire_connreq_t *wcr;
2467         kib_conn_t         *conn = NULL;
2468         cm_rej_code_t       reason = 0;
2469         int                 rc = 0;
2470         vv_return_t         retval;
2471         vv_qp_attr_t       *query;
2472         void               *qp_context;
2473
2474         LASSERT(cep);
2475         LASSERT(info);
2476
2477         CDEBUG(D_NET, "LISTEN status 0x%x for CEP %p\n", info->status, cep);
2478
2479         PORTAL_FREE(cm_tq, sizeof(*cm_tq));
2480
2481         req = &info->data.request;
2482         wcr = (kib_wire_connreq_t *)req->priv_data;
2483
2484         CDEBUG(D_NET, "%d from "LPX64"\n", info->status, 
2485                le64_to_cpu(wcr->wcr_nid));
2486         
2487 #ifdef TODO
2488         is there an equivalent?
2489         if (info->status == FCM_CONNECT_CANCEL)
2490                 return;
2491 #endif
2492         
2493         LASSERT (info->status == cm_event_conn_request);
2494         
2495         if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2496                 CERROR ("Can't accept: bad magic %08x\n",
2497                         le32_to_cpu(wcr->wcr_magic));
2498                 GOTO(out, reason = cm_rej_code_usr_rej);
2499         }
2500
2501         if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2502                 CERROR ("Can't accept: bad version %d\n",
2503                         le16_to_cpu(wcr->wcr_magic));
2504                 GOTO(out, reason = cm_rej_code_usr_rej);
2505         }
2506
2507         rc = kibnal_accept(&conn, cep,
2508                            le64_to_cpu(wcr->wcr_nid),
2509                            le64_to_cpu(wcr->wcr_incarnation),
2510                            le16_to_cpu(wcr->wcr_queue_depth));
2511         if (rc != 0) {
2512                 CERROR ("Can't accept "LPX64": %d\n",
2513                         le64_to_cpu(wcr->wcr_nid), rc);
2514                 GOTO(out, reason = cm_rej_code_no_res);
2515         }
2516
2517         /* TODO: I hope I got the ca_attr names correctly. */
2518         retval = kibnal_qp_rts(conn->ibc_qp, req->cep_data.qpn,
2519                             min_t(__u8, req->cep_data.offered_initiator_depth, 
2520                                   ca_attr->max_read_atom_qp_outstanding),
2521                             &req->path_data.path,
2522                             min_t(__u8, req->cep_data.offered_resp_res, 
2523                                   ca_attr->max_qp_depth_for_init_read_atom),
2524                             req->cep_data.start_psn);
2525
2526         if (retval) {
2527                 CERROR ("Can't mark QP RTS/RTR  "LPX64": %d\n",
2528                         le64_to_cpu(wcr->wcr_nid), retval);
2529                 GOTO(out, reason = cm_rej_code_no_qp);
2530         }
2531
2532         dump_qp(conn);
2533
2534         retval = vv_qp_query(kibnal_data.kib_hca, conn->ibc_qp, &qp_context, &conn->ibc_qp_attrs);
2535         if (retval) {
2536                 CERROR ("Couldn't query qp attributes "LPX64": %d\n",
2537                         le64_to_cpu(wcr->wcr_nid), retval);
2538                 GOTO(out, reason = cm_rej_code_no_qp);
2539         }
2540         query = &conn->ibc_qp_attrs;
2541
2542         PORTAL_ALLOC(rep, sizeof(*rep));
2543         if (rep == NULL) {
2544                 CERROR ("can't reply and receive buffers\n");
2545                 GOTO(out, reason = cm_rej_code_insuff_resp_res);
2546         }
2547
2548         /* don't try to deref this into the incoming wcr :) */
2549         wcr = (kib_wire_connreq_t *)rep->priv_data;
2550
2551         *rep = (cm_reply_data_t) {
2552                 .qpn = query->query.qp_num,
2553                 .start_psn = query->query.receve_psn,
2554                 .arb_resp_res = query->query.rdma_r_atom_outstand_num,
2555                 .arb_initiator_depth = query->query.rdma_r_atom_outstand_num,
2556                 .targ_ack_delay = 0,
2557                 .failover_accepted = 0,
2558                 .end_to_end_flow_ctrl = 1, /* (query->query.flow_control is never set) */
2559                 .rnr_retry_count = req->cep_data.rtr_retry_cnt,
2560         };
2561
2562         *wcr = (kib_wire_connreq_t) {
2563                 .wcr_magic       = cpu_to_le32(IBNAL_MSG_MAGIC),
2564                 .wcr_version     = cpu_to_le16(IBNAL_MSG_VERSION),
2565                 .wcr_queue_depth = cpu_to_le32(IBNAL_MSG_QUEUE_SIZE),
2566                 .wcr_nid         = cpu_to_le64(kibnal_data.kib_nid),
2567                 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2568         };
2569
2570         retval = cm_accept(cep, rep, NULL, kibnal_cm_callback, conn);
2571
2572         PORTAL_FREE(rep, sizeof(*rep));
2573
2574         if (retval) {
2575                 /* XXX it seems we don't call reject after this point? */
2576                 CERROR("cm_accept() failed: %d, aborting\n", retval);
2577                 rc = -ECONNABORTED;
2578                 goto out;
2579         }
2580
2581         if (kibnal_set_cm_flags(conn->ibc_cep)) {
2582                 rc = -ECONNABORTED;
2583                 goto out;
2584         }
2585
2586         conn->ibc_cep = cep;
2587
2588         CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
2589                conn, conn->ibc_peer->ibp_nid);
2590
2591 out:
2592         if (reason) {
2593                 kibnal_reject(cep, reason);
2594                 rc = -ECONNABORTED;
2595         }
2596
2597         return;
2598 }
2599
2600 void
2601 kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *info, void *arg)
2602 {
2603         struct cm_off_level *cm_tq;
2604
2605         LASSERT(cep);
2606         LASSERT(info);
2607         LASSERT(arg == NULL); /* no conn yet for passive */
2608
2609         PORTAL_ALLOC_ATOMIC(cm_tq, sizeof(*cm_tq));
2610         if (cm_tq == NULL) {
2611                 CERROR("Failed to allocate a CM off level structure\n");
2612                 return;
2613         }
2614
2615         cm_tq->tq.sync = 0;
2616         cm_tq->tq.routine = _kibnal_listen_callback;
2617         cm_tq->tq.data = cm_tq;
2618
2619         cm_tq->cep = cep;
2620         cm_tq->info = info;
2621         cm_tq->conn = NULL;
2622
2623         schedule_task(&cm_tq->tq);
2624 }
2625
2626 static void
2627 kibnal_pathreq_callback (struct sa_request *request)
2628 {
2629         vv_hca_attrib_t *ca_attr = &kibnal_data.kib_hca_attrs;
2630         kib_conn_t *conn = request->context;
2631         gsi_dtgrm_t *dtgrm;
2632         sa_mad_v2_t *mad;
2633         ib_path_record_v2_t *path;
2634         u64 component_mask;
2635         cm_return_t cmret;
2636
2637         if (request->status) {
2638                 CERROR ("status %d\n", request->status);
2639                 free_sa_request(request);
2640                 kibnal_connreq_done (conn, 1, -EINVAL);
2641                 return;
2642         }
2643
2644         dtgrm = request->dtgrm_resp;
2645         mad = (sa_mad_v2_t *) dtgrm->mad;
2646         path = (ib_path_record_v2_t *) mad->payload;
2647
2648         /* Put the path record in host order for that stack. */
2649         gid_swap(&path->sgid);
2650         gid_swap(&path->dgid);
2651         path->slid = be16_to_cpu(path->slid);
2652         path->dlid = be16_to_cpu(path->dlid);
2653         path->flow_label = be32_to_cpu(path->flow_label);
2654         path->pkey = be16_to_cpu(path->pkey);
2655         path->sl = be16_to_cpu(path->sl);
2656
2657         CDEBUG(D_NET, "sgid "LPX64":"LPX64" dgid "
2658                LPX64":"LPX64" pkey %x\n",
2659                path->sgid.scope.g.subnet,
2660                path->sgid.scope.g.eui64,
2661                path->dgid.scope.g.subnet,
2662                path->dgid.scope.g.eui64,
2663                path->pkey);
2664
2665 #if TODO
2666         component_mask = be64_to_cpu(mad->component_mask);
2667         if ((component_mask && (1ull << 1)) == 0) {
2668                 CERROR ("no servivce GID in SR: "LPX64"\n", component_mask);
2669                 free_sa_request(request);
2670                 kibnal_connreq_done (conn, 1, -EINVAL);
2671                 return;
2672         }
2673 #endif
2674
2675         conn->ibc_connreq->cr_path = *path;
2676
2677         free_sa_request(request);    
2678
2679         conn->ibc_cep = cm_create_cep(cm_cep_transp_rc);
2680         if (conn->ibc_cep == NULL) {
2681                 CERROR ("Can't create CEP\n");
2682                 kibnal_connreq_done (conn, 1, -EINVAL);
2683                 return;
2684         }
2685
2686         if (kibnal_set_cm_flags(conn->ibc_cep)) {
2687                 kibnal_connreq_done (conn, 1, -EINVAL);
2688                 return;
2689         }
2690
2691         conn->ibc_connreq->cr_wcr = (kib_wire_connreq_t) {
2692                 .wcr_magic       = cpu_to_le32(IBNAL_MSG_MAGIC),
2693                 .wcr_version     = cpu_to_le16(IBNAL_MSG_VERSION),
2694                 .wcr_queue_depth = cpu_to_le16(IBNAL_MSG_QUEUE_SIZE),
2695                 .wcr_nid         = cpu_to_le64(kibnal_data.kib_nid),
2696                 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2697         };
2698
2699         conn->ibc_connreq->cr_cm_req = (cm_request_data_t) {
2700                 .sid = kibnal_data.kib_service_id,
2701                 .cep_data = (cm_cep_data_t) { 
2702                         .ca_guid = kibnal_data.kib_hca_attrs.guid,
2703                         .end_to_end_flow_ctrl = 1,
2704                         .port_guid = kibnal_data.kib_port_gid.scope.g.eui64,
2705                         .local_port_num = kibnal_data.kib_port,
2706                         .start_psn = IBNAL_STARTING_PSN,
2707                         .qpn = conn->ibc_qp_attrs.query.qp_num,
2708                         .retry_cnt = IBNAL_RETRY,
2709                         .rtr_retry_cnt = IBNAL_RNR_RETRY,
2710                         .ack_timeout = IBNAL_ACK_TIMEOUT,
2711                         .offered_resp_res = ca_attr->max_read_atom_qp_outstanding,
2712                         .offered_initiator_depth = ca_attr->max_qp_depth_for_init_read_atom,
2713                 },
2714                 .path_data = (cm_cep_path_data_t) {
2715                         .subn_local = TRUE,
2716                         .path = conn->ibc_connreq->cr_path,
2717                 },
2718         };
2719
2720 #if 0
2721         /* XXX set timeout just like SDP!!!*/
2722         conn->ibc_connreq->cr_path.packet_life = 13;
2723 #endif
2724         /* Flag I'm getting involved with the CM... */
2725         conn->ibc_state = IBNAL_CONN_CONNECTING;
2726
2727 #if 0
2728         CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2729                conn->ibc_connreq->cr_service.RID.ServiceID, 
2730                *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2731 #endif
2732
2733         memset(conn->ibc_connreq->cr_cm_req.priv_data, 0, 
2734                cm_REQ_priv_data_len);
2735         memcpy(conn->ibc_connreq->cr_cm_req.priv_data, 
2736                &conn->ibc_connreq->cr_wcr, sizeof(conn->ibc_connreq->cr_wcr));
2737
2738         /* kibnal_cm_callback gets my conn ref */
2739         cmret = cm_connect(conn->ibc_cep, &conn->ibc_connreq->cr_cm_req,
2740                               kibnal_cm_callback, conn);
2741
2742         if (cmret) {
2743                 CERROR ("Connect failed: %d\n", cmret);
2744                 /* Back out state change as connect failed */
2745                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2746                 kibnal_connreq_done (conn, 1, -EINVAL);
2747         }
2748
2749         CDEBUG(D_NET, "connection REQ sent\n");
2750 }
2751
2752 static void
2753 kibnal_service_get_callback (struct sa_request *request)
2754 {
2755         kib_conn_t *conn = request->context;
2756         gsi_dtgrm_t *dtgrm;
2757         sa_mad_v2_t *mad;
2758         ib_service_record_v2_t *sr;
2759         u64 component_mask;
2760         int ret;
2761
2762         if (request->status) {
2763                 CERROR ("status %d\n", request->status);
2764                 free_sa_request(request);
2765                 kibnal_connreq_done (conn, 1, -EINVAL);
2766                 return;
2767         }
2768
2769         dtgrm = request->dtgrm_resp;
2770         mad = (sa_mad_v2_t *) dtgrm->mad;
2771         sr = (ib_service_record_v2_t *) mad->payload;
2772
2773         CDEBUG(D_NET, "sid "LPX64" gid "LPX64":"LPX64" pkey %x\n",
2774                sr->service_id,
2775                sr->service_gid.scope.g.subnet,
2776                sr->service_gid.scope.g.eui64,
2777                sr->service_pkey);
2778
2779         component_mask = be64_to_cpu(mad->component_mask);
2780         if ((component_mask && (1ull << 1)) == 0) {
2781                 CERROR ("no service GID in SR: "LPX64"\n", component_mask);
2782                 free_sa_request(request);
2783                 kibnal_connreq_done (conn, 1, -EINVAL);
2784                 return;
2785         }
2786
2787         //conn->ibc_connreq->cr_service = sr;
2788
2789         /* Return the response datagram to its pool. We don't need it anymore. */
2790         gsi_dtgrm_pool_put(request->dtgrm_resp);
2791         request->dtgrm_resp = NULL;
2792
2793         /* kibnal_pathreq_callback gets my conn ref */
2794         ret = kibnal_pathrecord_op(request, sr->service_gid, kibnal_pathreq_callback, conn);
2795         if (ret) {
2796                 CERROR ("Path record request failed: %d\n", ret);
2797                 kibnal_connreq_done (conn, 1, -EINVAL);
2798         }
2799
2800         return;
2801 }
2802
2803 static void
2804 kibnal_connect_peer (kib_peer_t *peer)
2805 {
2806         kib_conn_t  *conn = kibnal_create_conn();
2807         struct sa_request *request;
2808         int ret;
2809
2810         LASSERT (peer->ibp_connecting != 0);
2811
2812         if (conn == NULL) {
2813                 CERROR ("Can't allocate conn\n");
2814                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2815                 return;
2816         }
2817
2818         conn->ibc_peer = peer;
2819         kib_peer_addref(peer);
2820
2821         PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2822         if (conn->ibc_connreq == NULL) {
2823                 CERROR ("Can't allocate connreq\n");
2824                 kibnal_connreq_done (conn, 1, -ENOMEM);
2825                 return;
2826         }
2827
2828         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2829
2830         /* kibnal_service_get_callback gets my conn ref */
2831         ret = kibnal_advertize_op(peer->ibp_nid, SUBN_ADM_GET, kibnal_service_get_callback, conn);
2832
2833         if (ret) {
2834                 CERROR("kibnal_advertize_op failed for op %d NID "LPX64"\n", SUBN_ADM_GET, peer->ibp_nid);
2835                 /* TODO: I'm unsure yet whether ret contains a
2836                  * consistent error type, so I return -EIO in the
2837                  * meantime. */
2838                 kibnal_connreq_done (conn, 1, -EIO);
2839         }
2840
2841         return;
2842 }
2843
2844 static int
2845 kibnal_conn_timed_out (kib_conn_t *conn)
2846 {
2847         kib_tx_t          *tx;
2848         struct list_head  *ttmp;
2849         unsigned long      flags;
2850
2851         spin_lock_irqsave (&conn->ibc_lock, flags);
2852
2853         list_for_each (ttmp, &conn->ibc_tx_queue) {
2854                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2855
2856                 LASSERT (!tx->tx_passive_rdma_wait);
2857                 LASSERT (tx->tx_sending == 0);
2858
2859                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2860                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2861                         return 1;
2862                 }
2863         }
2864
2865         list_for_each (ttmp, &conn->ibc_active_txs) {
2866                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2867
2868                 LASSERT (tx->tx_passive_rdma ||
2869                          !tx->tx_passive_rdma_wait);
2870
2871                 LASSERT (tx->tx_passive_rdma_wait ||
2872                          tx->tx_sending != 0);
2873
2874                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2875                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2876                         return 1;
2877                 }
2878         }
2879
2880         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2881
2882         return 0;
2883 }
2884
2885 static void
2886 kibnal_check_conns (int idx)
2887 {
2888         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2889         struct list_head  *ptmp;
2890         kib_peer_t        *peer;
2891         kib_conn_t        *conn;
2892         struct list_head  *ctmp;
2893         unsigned long      flags;
2894
2895  again:
2896         /* NB. We expect to have a look at all the peers and not find any
2897          * rdmas to time out, so we just use a shared lock while we
2898          * take a look... */
2899         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2900
2901         list_for_each (ptmp, peers) {
2902                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2903
2904                 list_for_each (ctmp, &peer->ibp_conns) {
2905                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2906
2907                         KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_ESTABLISHED);
2908
2909                         /* In case we have enough credits to return via a
2910                          * NOOP, but there were no non-blocking tx descs
2911                          * free to do it last time... */
2912                         kibnal_check_sends(conn);
2913
2914                         if (!kibnal_conn_timed_out(conn))
2915                                 continue;
2916                         
2917                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2918                                conn, conn->ibc_state, peer->ibp_nid,
2919                                atomic_read (&conn->ibc_refcount));
2920
2921                         atomic_inc (&conn->ibc_refcount);
2922                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2923                                                flags);
2924
2925                         CERROR("Timed out RDMA with "LPX64"\n",
2926                                peer->ibp_nid);
2927
2928                         kibnal_close_conn (conn, -ETIMEDOUT);
2929                         kibnal_put_conn (conn);
2930
2931                         /* start again now I've dropped the lock */
2932                         goto again;
2933                 }
2934         }
2935
2936         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2937 }
2938
2939 static void
2940 kib_connd_handle_state(kib_conn_t *conn)
2941 {
2942         vv_return_t retval;
2943
2944         switch (conn->ibc_state) {
2945                 /* all refs have gone, free and be done with it */ 
2946                 case IBNAL_CONN_DISCONNECTED:
2947                         kibnal_destroy_conn (conn);
2948                         return; /* avoid put_conn */
2949
2950                 case IBNAL_CONN_SEND_DREQ:
2951                         
2952                         retval = cm_disconnect(conn->ibc_cep, &kibnal_data.cm_data.dreq_data, NULL);
2953                         if (retval) /* XXX do real things */
2954                                 CERROR("disconnect failed: %d\n", retval);
2955                         
2956                         conn->ibc_state = IBNAL_CONN_DREQ;
2957                         break;
2958
2959                 /* a callback got to the conn before we did */ 
2960                 case IBNAL_CONN_DREP:
2961                         break;
2962                                 
2963                 default:
2964                         CERROR ("Bad conn %p state: %d\n", conn, 
2965                                 conn->ibc_state);
2966                         LBUG();
2967                         break;
2968         }
2969
2970         /* drop ref from close_conn */
2971         kibnal_put_conn(conn);
2972 }
2973
2974 int
2975 kibnal_connd (void *arg)
2976 {
2977         wait_queue_t       wait;
2978         unsigned long      flags;
2979         kib_conn_t        *conn;
2980         kib_peer_t        *peer;
2981         int                timeout;
2982         int                i;
2983         int                peer_index = 0;
2984         unsigned long      deadline = jiffies;
2985         
2986         kportal_daemonize ("kibnal_connd");
2987         kportal_blockallsigs ();
2988
2989         init_waitqueue_entry (&wait, current);
2990
2991         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2992
2993         for (;;) {
2994                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
2995                         conn = list_entry (kibnal_data.kib_connd_conns.next,
2996                                            kib_conn_t, ibc_list);
2997                         list_del (&conn->ibc_list);
2998                         
2999                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3000                         kib_connd_handle_state(conn);
3001
3002                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3003                         continue;
3004                 }
3005
3006                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
3007                         peer = list_entry (kibnal_data.kib_connd_peers.next,
3008                                            kib_peer_t, ibp_connd_list);
3009                         
3010                         list_del_init (&peer->ibp_connd_list);
3011                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3012
3013                         kibnal_connect_peer (peer);
3014                         kib_peer_decref (peer);
3015
3016                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3017                 }
3018
3019                 /* shut down and nobody left to reap... */
3020                 if (kibnal_data.kib_shutdown &&
3021                     atomic_read(&kibnal_data.kib_nconns) == 0)
3022                         break;
3023
3024                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3025
3026                 /* careful with the jiffy wrap... */
3027                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
3028                         const int n = 4;
3029                         const int p = 1;
3030                         int       chunk = kibnal_data.kib_peer_hash_size;
3031                         
3032                         /* Time to check for RDMA timeouts on a few more
3033                          * peers: I do checks every 'p' seconds on a
3034                          * proportion of the peer table and I need to check
3035                          * every connection 'n' times within a timeout
3036                          * interval, to ensure I detect a timeout on any
3037                          * connection within (n+1)/n times the timeout
3038                          * interval. */
3039
3040                         if (kibnal_tunables.kib_io_timeout > n * p)
3041                                 chunk = (chunk * n * p) / 
3042                                         kibnal_tunables.kib_io_timeout;
3043                         if (chunk == 0)
3044                                 chunk = 1;
3045
3046                         for (i = 0; i < chunk; i++) {
3047                                 kibnal_check_conns (peer_index);
3048                                 peer_index = (peer_index + 1) % 
3049                                              kibnal_data.kib_peer_hash_size;
3050                         }
3051
3052                         deadline += p * HZ;
3053                 }
3054
3055                 kibnal_data.kib_connd_waketime = jiffies + timeout;
3056
3057                 set_current_state (TASK_INTERRUPTIBLE);
3058                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3059
3060                 if (!kibnal_data.kib_shutdown &&
3061                     list_empty (&kibnal_data.kib_connd_conns) &&
3062                     list_empty (&kibnal_data.kib_connd_peers))
3063                         schedule_timeout (timeout);
3064
3065                 set_current_state (TASK_RUNNING);
3066                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3067
3068                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3069         }
3070
3071         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3072
3073         kibnal_thread_fini ();
3074         return (0);
3075 }
3076
3077 int
3078 kibnal_scheduler(void *arg)
3079 {
3080         long            id = (long)arg;
3081         char            name[16];
3082         kib_rx_t       *rx;
3083         kib_tx_t       *tx;
3084         unsigned long   flags;
3085         int             rc;
3086         int             counter = 0;
3087         int             did_something;
3088
3089         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3090         kportal_daemonize(name);
3091         kportal_blockallsigs();
3092
3093         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3094
3095         for (;;) {
3096                 did_something = 0;
3097
3098                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
3099                         tx = list_entry(kibnal_data.kib_sched_txq.next,
3100                                         kib_tx_t, tx_list);
3101                         list_del(&tx->tx_list);
3102                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3103                                                flags);
3104                         kibnal_tx_done(tx);
3105
3106                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
3107                                           flags);
3108                 }
3109
3110                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
3111                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
3112                                         kib_rx_t, rx_list);
3113                         list_del(&rx->rx_list);
3114                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3115                                                flags);
3116
3117                         kibnal_rx(rx);
3118
3119                         did_something = 1;
3120                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
3121                                           flags);
3122                 }
3123
3124                 /* shut down and no receives to complete... */
3125                 if (kibnal_data.kib_shutdown &&
3126                     atomic_read(&kibnal_data.kib_nconns) == 0)
3127                         break;
3128
3129                 /* nothing to do or hogging CPU */
3130                 if (!did_something || counter++ == IBNAL_RESCHED) {
3131                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3132                                                flags);
3133                         counter = 0;
3134
3135                         if (!did_something) {
3136                                 rc = wait_event_interruptible(
3137                                         kibnal_data.kib_sched_waitq,
3138                                         !list_empty(&kibnal_data.kib_sched_txq) || 
3139                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
3140                                         (kibnal_data.kib_shutdown &&
3141                                          atomic_read (&kibnal_data.kib_nconns) == 0));
3142                         } else {
3143                                 our_cond_resched();
3144                         }
3145
3146                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
3147                                           flags);
3148                 }
3149         }
3150
3151         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3152
3153         kibnal_thread_fini();
3154         return (0);
3155 }
3156
3157
3158 lib_nal_t kibnal_lib = {
3159         .libnal_data = &kibnal_data,      /* NAL private data */
3160         .libnal_send = kibnal_send,
3161         .libnal_send_pages = kibnal_send_pages,
3162         .libnal_recv = kibnal_recv,
3163         .libnal_recv_pages = kibnal_recv_pages,
3164         .libnal_dist = kibnal_dist
3165 };