Whamcloud - gitweb
* First cut working vibnal
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 void
28 kibnal_tx_done (kib_tx_t *tx)
29 {
30         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
31         int              i;
32
33         LASSERT (!in_interrupt());
34         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
35         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
36
37 #if !IBNAL_WHOLE_MEM
38         switch (tx->tx_mapped) {
39         default:
40                 LBUG();
41
42         case KIB_TX_UNMAPPED:
43                 break;
44
45         case KIB_TX_MAPPED: {
46                 vv_return_t      vvrc;
47
48                 vvrc = vv_mem_region_destroy(kibnal_data.kib_hca,
49                                              tx->tx_md.md_handle);
50                 LASSERT (vvrc == vv_return_ok);
51                 tx->tx_mapped = KIB_TX_UNMAPPED;
52                 break;
53         }
54         }
55 #endif
56         for (i = 0; i < 2; i++) {
57                 /* tx may have up to 2 libmsgs to finalise */
58                 if (tx->tx_libmsg[i] == NULL)
59                         continue;
60
61                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
62                 tx->tx_libmsg[i] = NULL;
63         }
64         
65         if (tx->tx_conn != NULL) {
66                 kibnal_conn_decref(tx->tx_conn);
67                 tx->tx_conn = NULL;
68         }
69
70         tx->tx_nwrq = 0;
71         tx->tx_status = 0;
72
73         spin_lock(&kibnal_data.kib_tx_lock);
74
75         if (tx->tx_isnblk) {
76                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
77         } else {
78                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
79                 wake_up (&kibnal_data.kib_idle_tx_waitq);
80         }
81
82         spin_unlock(&kibnal_data.kib_tx_lock);
83 }
84
85 kib_tx_t *
86 kibnal_get_idle_tx (int may_block) 
87 {
88         kib_tx_t      *tx = NULL;
89         ENTRY;
90         
91         for (;;) {
92                 spin_lock(&kibnal_data.kib_tx_lock);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
96                         tx = list_entry (kibnal_data.kib_idle_txs.next,
97                                          kib_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
104                                 CERROR ("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
109                                          kib_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock(&kibnal_data.kib_tx_lock);
115
116                 wait_event (kibnal_data.kib_idle_tx_waitq,
117                             !list_empty (&kibnal_data.kib_idle_txs) ||
118                             kibnal_data.kib_shutdown);
119         }
120
121         if (tx != NULL) {
122                 list_del (&tx->tx_list);
123
124                 /* Allocate a new completion cookie.  It might not be needed,
125                  * but we've got a lock right now and we're unlikely to
126                  * wrap... */
127                 tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
128 #if IBNAL_WHOLE_MEM
129                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
130 #endif
131                 LASSERT (tx->tx_nwrq == 0);
132                 LASSERT (tx->tx_sending == 0);
133                 LASSERT (!tx->tx_waiting);
134                 LASSERT (tx->tx_status == 0);
135                 LASSERT (tx->tx_conn == NULL);
136                 LASSERT (tx->tx_libmsg[0] == NULL);
137                 LASSERT (tx->tx_libmsg[1] == NULL);
138         }
139
140         spin_unlock(&kibnal_data.kib_tx_lock);
141         
142         RETURN(tx);
143 }
144
145 int
146 kibnal_post_rx (kib_rx_t *rx, int credit)
147 {
148         kib_conn_t   *conn = rx->rx_conn;
149         int           rc = 0;
150         vv_return_t   vvrc;
151
152         LASSERT (!in_interrupt());
153         
154         rx->rx_gl = (vv_scatgat_t) {
155                 .v_address = (void *)((unsigned long)KIBNAL_RX_VADDR(rx)),
156                 .l_key     = KIBNAL_RX_LKEY(rx),
157                 .length    = IBNAL_MSG_SIZE,
158         };
159
160         rx->rx_wrq = (vv_wr_t) {
161                 .wr_id                   = (unsigned long)rx,
162                 .completion_notification = 1,
163                 .scatgat_list            = &rx->rx_gl,
164                 .num_of_data_segments    = 1,
165                 .wr_type                 = vv_wr_receive,
166         };
167
168         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
169         LASSERT (!rx->rx_posted);
170
171         CDEBUG(D_NET, "posting rx [%d %x %p]\n", 
172                rx->rx_wrq.scatgat_list->length,
173                rx->rx_wrq.scatgat_list->l_key,
174                rx->rx_wrq.scatgat_list->v_address);
175
176         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
177                 /* No more posts for this rx; so lose its ref */
178                 kibnal_conn_decref(conn);
179                 return 0;
180         }
181         
182         rx->rx_posted = 1;
183
184         spin_lock(&conn->ibc_lock);
185         /* Serialise vv_post_receive; it's not re-entrant on the same QP */
186         vvrc = vv_post_receive(kibnal_data.kib_hca,
187                                conn->ibc_qp, &rx->rx_wrq);
188         spin_unlock(&conn->ibc_lock);
189
190         if (vvrc == 0) {
191                 if (credit) {
192                         spin_lock(&conn->ibc_lock);
193                         conn->ibc_outstanding_credits++;
194                         spin_unlock(&conn->ibc_lock);
195
196                         kibnal_check_sends(conn);
197                 }
198                 return 0;
199         }
200         
201         CERROR ("post rx -> "LPX64" failed %d\n", 
202                 conn->ibc_peer->ibp_nid, vvrc);
203         rc = -EIO;
204         kibnal_close_conn(rx->rx_conn, rc);
205         /* No more posts for this rx; so lose its ref */
206         kibnal_conn_decref(conn);
207         return rc;
208 }
209
210 int
211 kibnal_post_receives (kib_conn_t *conn)
212 {
213         int    i;
214         int    rc;
215
216         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
217         LASSERT (conn->ibc_comms_error == 0);
218
219         for (i = 0; i < IBNAL_RX_MSGS; i++) {
220                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
221                  * fails (i.e. actual failure or we're disconnecting) */
222                 kibnal_conn_addref(conn);
223                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0);
224                 if (rc != 0)
225                         return rc;
226         }
227
228         return 0;
229 }
230
231 kib_tx_t *
232 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
233 {
234         struct list_head   *tmp;
235         
236         list_for_each(tmp, &conn->ibc_active_txs) {
237                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
238                 
239                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
240
241                 if (tx->tx_cookie != cookie)
242                         continue;
243
244                 if (tx->tx_waiting &&
245                     tx->tx_msg->ibm_type == txtype)
246                         return tx;
247
248                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
249                       tx->tx_waiting ? "" : "NOT ",
250                       tx->tx_msg->ibm_type, txtype);
251         }
252         return NULL;
253 }
254
255 void
256 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
257 {
258         kib_tx_t    *tx;
259         int          idle;
260
261         spin_lock(&conn->ibc_lock);
262
263         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
264         if (tx == NULL) {
265                 spin_unlock(&conn->ibc_lock);
266
267                 CWARN("Unmatched completion type %x cookie "LPX64
268                       " from "LPX64"\n",
269                       txtype, cookie, conn->ibc_peer->ibp_nid);
270                 kibnal_close_conn (conn, -EPROTO);
271                 return;
272         }
273
274         if (tx->tx_status == 0) {               /* success so far */
275                 if (status < 0) {               /* failed? */
276                         tx->tx_status = status;
277                 } else if (txtype == IBNAL_MSG_GET_REQ) { 
278                         /* XXX layering violation: set REPLY data length */
279                         LASSERT (tx->tx_libmsg[1] != NULL);
280                         LASSERT (tx->tx_libmsg[1]->ev.type == 
281                                  PTL_EVENT_REPLY_END);
282
283                         tx->tx_libmsg[1]->ev.mlength = status;
284                 }
285         }
286         
287         tx->tx_waiting = 0;
288
289         idle = tx->tx_sending == 0;
290         if (idle)
291                 list_del(&tx->tx_list);
292
293         spin_unlock(&conn->ibc_lock);
294         
295         if (idle)
296                 kibnal_tx_done(tx);
297 }
298
299 void
300 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
301 {
302         kib_tx_t    *tx = kibnal_get_idle_tx(0);
303         
304         if (tx == NULL) {
305                 CERROR("Can't get tx for completion %x for "LPX64"\n",
306                        type, conn->ibc_peer->ibp_nid);
307                 return;
308         }
309         
310         tx->tx_msg->ibm_u.completion.ibcm_status = status;
311         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
312         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
313         
314         kibnal_queue_tx(tx, conn);
315 }
316
317 void
318 kibnal_handle_rx (kib_rx_t *rx)
319 {
320         kib_msg_t    *msg = rx->rx_msg;
321         kib_conn_t   *conn = rx->rx_conn;
322         int           credits = msg->ibm_credits;
323         kib_tx_t     *tx;
324         int           rc;
325
326         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
327
328         CDEBUG (D_NET, "Received %x[%d] from "LPX64"\n",
329                 msg->ibm_type, credits, conn->ibc_peer->ibp_nid);
330         
331         if (credits != 0) {
332                 /* Have I received credits that will let me send? */
333                 spin_lock(&conn->ibc_lock);
334                 conn->ibc_credits += credits;
335                 spin_unlock(&conn->ibc_lock);
336
337                 kibnal_check_sends(conn);
338         }
339
340         switch (msg->ibm_type) {
341         default:
342                 CERROR("Bad IBNAL message type %x from "LPX64"\n",
343                        msg->ibm_type, conn->ibc_peer->ibp_nid);
344                 break;
345
346         case IBNAL_MSG_NOOP:
347                 break;
348
349         case IBNAL_MSG_IMMEDIATE:
350                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
351                 break;
352                 
353         case IBNAL_MSG_PUT_REQ:
354                 rx->rx_responded = 0;
355                 lib_parse(&kibnal_lib, &msg->ibm_u.putreq.ibprm_hdr, rx);
356                 if (rx->rx_responded)
357                         break;
358
359                 /* I wasn't asked to transfer any payload data.  This happens
360                  * if the PUT didn't match, or got truncated. */
361                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
362                                        msg->ibm_u.putreq.ibprm_cookie);
363                 break;
364
365         case IBNAL_MSG_PUT_NAK:
366                 CWARN ("PUT_NACK from "LPX64"\n", conn->ibc_peer->ibp_nid);
367                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
368                                          msg->ibm_u.completion.ibcm_status,
369                                          msg->ibm_u.completion.ibcm_cookie);
370                 break;
371
372         case IBNAL_MSG_PUT_ACK:
373                 spin_lock(&conn->ibc_lock);
374                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
375                                                    msg->ibm_u.putack.ibpam_src_cookie);
376                 if (tx != NULL)
377                         list_del(&tx->tx_list);
378                 spin_unlock(&conn->ibc_lock);
379
380                 if (tx == NULL) {
381                         CERROR("Unmatched PUT_ACK from "LPX64"\n",
382                                conn->ibc_peer->ibp_nid);
383                         kibnal_close_conn(conn, -EPROTO);
384                         break;
385                 }
386
387                 LASSERT (tx->tx_waiting);
388                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
389                  * (a) I can overwrite tx_msg since my peer has received it!
390                  * (b) while tx_waiting is set, tx_complete() won't touch it.
391                  */
392
393                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
394
395                 rc = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
396                                       kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
397                                       &msg->ibm_u.putack.ibpam_rd,
398                                       msg->ibm_u.putack.ibpam_dst_cookie);
399                 if (rc < 0)
400                         CERROR("Can't setup rdma for PUT to "LPX64": %d\n",
401                                conn->ibc_peer->ibp_nid, rc);
402
403                 spin_lock(&conn->ibc_lock);
404                 if (tx->tx_status == 0 && rc < 0)
405                         tx->tx_status = rc;
406                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
407                 kibnal_queue_tx_locked(tx, conn);
408                 spin_unlock(&conn->ibc_lock);
409                 break;
410                 
411         case IBNAL_MSG_PUT_DONE:
412                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
413                                          msg->ibm_u.completion.ibcm_status,
414                                          msg->ibm_u.completion.ibcm_cookie);
415                 break;
416
417         case IBNAL_MSG_GET_REQ:
418                 rx->rx_responded = 0;
419                 lib_parse(&kibnal_lib, &msg->ibm_u.get.ibgm_hdr, rx);
420                 if (rx->rx_responded)           /* I responded to the GET_REQ */
421                         break;
422                 /* NB GET didn't match (I'd have responded even with no payload
423                  * data) */
424                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, -ENODATA,
425                                        msg->ibm_u.get.ibgm_cookie);
426                 break;
427
428         case IBNAL_MSG_GET_DONE:
429                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
430                                          msg->ibm_u.completion.ibcm_status,
431                                          msg->ibm_u.completion.ibcm_cookie);
432                 break;
433         }
434
435         kibnal_post_rx(rx, 1);
436 }
437
438 void
439 kibnal_rx_complete (kib_rx_t *rx, int nob, vv_comp_status_t vvrc)
440 {
441         kib_msg_t    *msg = rx->rx_msg;
442         kib_conn_t   *conn = rx->rx_conn;
443         unsigned long flags;
444         int           rc;
445
446         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
447         LASSERT (rx->rx_posted);
448         rx->rx_posted = 0;
449
450         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
451                 goto ignore;
452
453         if (vvrc != vv_comp_status_success) {
454                 CERROR("Rx from "LPX64" failed: %d\n", 
455                        conn->ibc_peer->ibp_nid, vvrc);
456                 goto failed;
457         }
458
459         rc = kibnal_unpack_msg(msg, nob);
460         if (rc != 0) {
461                 CERROR ("Error %d unpacking rx from "LPX64"\n",
462                         rc, conn->ibc_peer->ibp_nid);
463                 goto failed;
464         }
465
466         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
467             msg->ibm_srcstamp != conn->ibc_incarnation ||
468             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
469             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
470                 CERROR ("Stale rx from "LPX64"\n",
471                         conn->ibc_peer->ibp_nid);
472                 goto failed;
473         }
474
475         /* racing with connection establishment/teardown! */
476
477         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
478                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
479                 /* must check holding global lock to eliminate race */
480                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
481                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
482                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
483                                                 flags);
484                         return;
485                 }
486                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
487                                         flags);
488         }
489         kibnal_handle_rx(rx);
490         return;
491         
492  failed:
493         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
494         kibnal_close_conn(conn, -EIO);
495  ignore:
496         /* Don't re-post rx & drop its ref on conn */
497         kibnal_conn_decref(conn);
498 }
499
500 #if IBNAL_WHOLE_MEM
501 int
502 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
503                      unsigned long page_offset, unsigned long len)
504 {
505         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
506         vv_l_key_t       l_key;
507         vv_r_key_t       r_key;
508         void            *addr;
509         void            *vaddr;
510         vv_mem_reg_h_t   mem_h;
511         vv_return_t      vvrc;
512
513         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
514                 CERROR ("Too many RDMA fragments\n");
515                 return -EMSGSIZE;
516         }
517
518         addr = (void *)(((unsigned long)kmap(page)) + page_offset);
519
520         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca, addr,
521                                     len, &mem_h, &l_key, &r_key);
522         LASSERT (vvrc == vv_return_ok);
523
524         kunmap(page);
525
526         if (active) {
527                 if (rd->rd_nfrag == 0) {
528                         rd->rd_key = l_key;
529                 } else if (l_key != rd->rd_key) {
530                         CERROR ("> 1 key for single RDMA desc\n");
531                         return -EINVAL;
532                 }
533                 vaddr = addr;
534         } else {
535                 if (rd->rd_nfrag == 0) {
536                         rd->rd_key = r_key;
537                 } else if (r_key != rd->rd_key) {
538                         CERROR ("> 1 key for single RDMA desc\n");
539                         return -EINVAL;
540                 }
541                 vv_va2advertise_addr(kibnal_data.kib_hca, addr, &vaddr);
542         }
543
544         kibnal_rf_set(frag, (unsigned long)vaddr, len);
545
546         CDEBUG(D_NET,"map frag [%d][%d %x %08x%08x] %p\n", 
547                rd->rd_nfrag, frag->rf_nob, rd->rd_key, 
548                frag->rf_addr_hi, frag->rf_addr_lo, addr);
549
550         rd->rd_nfrag++;
551         return 0;
552 }
553
554 struct page *
555 kibnal_kvaddr_to_page (unsigned long vaddr)
556 {
557         struct page *page;
558
559         if (vaddr >= VMALLOC_START &&
560             vaddr < VMALLOC_END)
561                 page = vmalloc_to_page ((void *)vaddr);
562 #if CONFIG_HIGHMEM
563         else if (vaddr >= PKMAP_BASE &&
564                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
565                 page = vmalloc_to_page ((void *)vaddr);
566         /* in 2.4 ^ just walks the page tables */
567 #endif
568         else
569                 page = virt_to_page (vaddr);
570
571         return VALID_PAGE(page) ? page : NULL;
572 }
573
574 int
575 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, 
576                     vv_access_con_bit_mask_t access,
577                     int niov, struct iovec *iov, int offset, int nob)
578                  
579 {
580         /* active if I'm sending */
581         int           active = ((access & vv_acc_r_mem_write) == 0);
582         int           fragnob;
583         int           rc;
584         unsigned long vaddr;
585         struct page  *page;
586         int           page_offset;
587
588         LASSERT (nob > 0);
589         LASSERT (niov > 0);
590         LASSERT ((rd != tx->tx_rd) == !active);
591
592         while (offset >= iov->iov_len) {
593                 offset -= iov->iov_len;
594                 niov--;
595                 iov++;
596                 LASSERT (niov > 0);
597         }
598
599         rd->rd_nfrag = 0;
600         do {
601                 LASSERT (niov > 0);
602
603                 vaddr = ((unsigned long)iov->iov_base) + offset;
604                 page_offset = vaddr & (PAGE_SIZE - 1);
605                 page = kibnal_kvaddr_to_page(vaddr);
606                 if (page == NULL) {
607                         CERROR ("Can't find page\n");
608                         return -EFAULT;
609                 }
610
611                 fragnob = min((int)(iov->iov_len - offset), nob);
612                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
613
614                 rc = kibnal_append_rdfrag(rd, active, page, 
615                                           page_offset, fragnob);
616                 if (rc != 0)
617                         return rc;
618
619                 if (offset + fragnob < iov->iov_len) {
620                         offset += fragnob;
621                 } else {
622                         offset = 0;
623                         iov++;
624                         niov--;
625                 }
626                 nob -= fragnob;
627         } while (nob > 0);
628         
629         return 0;
630 }
631
632 int
633 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, 
634                       vv_access_con_bit_mask_t access,
635                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
636 {
637         /* active if I'm sending */
638         int            active = ((access & vv_acc_r_mem_write) == 0);
639         int            fragnob;
640         int            rc;
641
642         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
643
644         LASSERT (nob > 0);
645         LASSERT (nkiov > 0);
646         LASSERT ((rd != tx->tx_rd) == !active);
647
648         while (offset >= kiov->kiov_len) {
649                 offset -= kiov->kiov_len;
650                 nkiov--;
651                 kiov++;
652                 LASSERT (nkiov > 0);
653         }
654
655         rd->rd_nfrag = 0;
656         do {
657                 LASSERT (nkiov > 0);
658                 fragnob = min((int)(kiov->kiov_len - offset), nob);
659                 
660                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
661                                           kiov->kiov_offset + offset,
662                                           fragnob);
663                 if (rc != 0)
664                         return rc;
665
666                 offset = 0;
667                 kiov++;
668                 nkiov--;
669                 nob -= fragnob;
670         } while (nob > 0);
671
672         return 0;
673 }
674 #else
675 int
676 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
677                      vv_access_con_bit_mask_t access,
678                      int niov, struct iovec *iov, int offset, int nob)
679                  
680 {
681         /* active if I'm sending */
682         int         active = ((access & vv_acc_r_mem_write) == 0);
683         void       *vaddr;
684         vv_return_t vvrc;
685
686         LASSERT (nob > 0);
687         LASSERT (niov > 0);
688         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
689         LASSERT ((rd != tx->tx_rd) == !active);
690
691         while (offset >= iov->iov_len) {
692                 offset -= iov->iov_len;
693                 niov--;
694                 iov++;
695                 LASSERT (niov > 0);
696         }
697
698         if (nob > iov->iov_len - offset) {
699                 CERROR ("Can't map multiple vaddr fragments\n");
700                 return (-EMSGSIZE);
701         }
702
703         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
704         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
705
706         vvrc = vv_mem_region_register(kibnal_data.kib_hca, vaddr, nob,
707                                       kibnal_data.kib_pd, access,
708                                       &tx->tx_md.md_handle, 
709                                       &tx->tx_md.md_lkey,
710                                       &tx->tx_md.md_rkey);
711         if (vvrc != vv_return_ok) {
712                 CERROR ("Can't map vaddr %p: %d\n", vaddr, vvrc);
713                 return -EFAULT;
714         }
715
716         tx->tx_mapped = KIB_TX_MAPPED;
717
718         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
719         rd->rd_nfrag = 1;
720         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
721         
722         return (0);
723 }
724
725 int
726 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
727                       vv_access_con_bit_mask_t access,
728                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
729 {
730         /* active if I'm sending */
731         int            active = ((access & vv_acc_r_mem_write) == 0);
732         vv_return_t    vvrc;
733         vv_phy_list_t  phys_pages;
734         vv_phy_buf_t  *phys;
735         int            page_offset;
736         int            nphys;
737         int            resid;
738         int            phys_size;
739         int            rc;
740
741         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
742
743         LASSERT (nob > 0);
744         LASSERT (nkiov > 0);
745         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
746         LASSERT ((rd != tx->tx_rd) == !active);
747
748         while (offset >= kiov->kiov_len) {
749                 offset -= kiov->kiov_len;
750                 nkiov--;
751                 kiov++;
752                 LASSERT (nkiov > 0);
753         }
754
755         phys_size = nkiov * sizeof (*phys);
756         PORTAL_ALLOC(phys, phys_size);
757         if (phys == NULL) {
758                 CERROR ("Can't allocate tmp phys\n");
759                 return (-ENOMEM);
760         }
761
762         page_offset = kiov->kiov_offset + offset;
763
764         phys[0].start = kibnal_page2phys(kiov->kiov_page);
765         phys[0].size = PAGE_SIZE;
766
767         nphys = 1;
768         resid = nob - (kiov->kiov_len - offset);
769
770         while (resid > 0) {
771                 kiov++;
772                 nkiov--;
773                 LASSERT (nkiov > 0);
774
775                 if (kiov->kiov_offset != 0 ||
776                     ((resid > PAGE_SIZE) && 
777                      kiov->kiov_len < PAGE_SIZE)) {
778                         int i;
779                         /* Can't have gaps */
780                         CERROR ("Can't make payload contiguous in I/O VM:"
781                                 "page %d, offset %d, len %d \n", nphys, 
782                                 kiov->kiov_offset, kiov->kiov_len);
783
784                         for (i = -nphys; i < nkiov; i++)
785                                 CERROR("kiov[%d] %p +%d for %d\n",
786                                        i, kiov[i].kiov_page, 
787                                        kiov[i].kiov_offset, 
788                                        kiov[i].kiov_len);
789                         
790                         rc = -EINVAL;
791                         goto out;
792                 }
793
794                 LASSERT (nphys * sizeof (*phys) < phys_size);
795                 phys[nphys].start = kibnal_page2phys(kiov->kiov_page);
796                 phys[nphys].size = PAGE_SIZE;
797
798                 nphys++;
799                 resid -= PAGE_SIZE;
800         }
801
802 #if 0
803         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
804         for (i = 0; i < nphys; i++)
805                 CWARN ("   [%d] "LPX64"\n", i, phys[i]);
806 #endif
807
808         vvrc = vv_phy_mem_region_register(kibnal_data.kib_hca,
809                                           &phys_pages,
810                                           IBNAL_RDMA_BASE,
811                                           nphys,
812                                           page_offset,
813                                           kibnal_data.kib_pd,
814                                           access,
815                                           &tx->tx_md.md_handle,
816                                           &tx->tx_md.md_addr,
817                                           &tx->tx_md.md_lkey,
818                                           &tx->tx_md.md_rkey);
819
820         if (vvrc != vv_return_ok) {
821                 CERROR ("Can't map phys: %d\n", vvrc);
822                 rc = -EFAULT;
823                 goto out;
824         }
825
826         CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: "
827                "lkey %x, rkey %x, addr "LPX64"\n",
828                nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey,
829                tx->tx_md.md_addr);
830
831         tx->tx_mapped = KIB_TX_MAPPED;
832         rc = 0;
833
834         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
835         rd->rd_nfrag = 1;
836         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
837         
838  out:
839         PORTAL_FREE(phys, phys_size);
840         return (rc);
841 }
842 #endif
843
844 kib_conn_t *
845 kibnal_find_conn_locked (kib_peer_t *peer)
846 {
847         struct list_head *tmp;
848
849         /* just return the first connection */
850         list_for_each (tmp, &peer->ibp_conns) {
851                 return (list_entry(tmp, kib_conn_t, ibc_list));
852         }
853
854         return (NULL);
855 }
856
857 void
858 kibnal_check_sends (kib_conn_t *conn)
859 {
860         kib_tx_t       *tx;
861         vv_return_t     vvrc;                        
862         int             rc;
863         int             i;
864         int             done;
865
866         /* Don't send anything until after the connection is established */
867         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
868                 CDEBUG(D_NET, LPX64"too soon\n", conn->ibc_peer->ibp_nid);
869                 return;
870         }
871         
872         spin_lock(&conn->ibc_lock);
873
874         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
875
876         if (list_empty(&conn->ibc_tx_queue) &&
877             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
878                 spin_unlock(&conn->ibc_lock);
879                 
880                 tx = kibnal_get_idle_tx(0);     /* don't block */
881                 if (tx != NULL)
882                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
883
884                 spin_lock(&conn->ibc_lock);
885                 
886                 if (tx != NULL)
887                         kibnal_queue_tx_locked(tx, conn);
888         }
889
890         while (!list_empty (&conn->ibc_tx_queue)) {
891                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
892
893                 /* We rely on this for QP sizing */
894                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
895
896                 LASSERT (conn->ibc_outstanding_credits >= 0);
897                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
898                 LASSERT (conn->ibc_credits >= 0);
899                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
900
901                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE) {
902                         CDEBUG(D_NET, LPX64": posted enough\n",
903                                conn->ibc_peer->ibp_nid);
904                         break;
905                 }
906                 
907                 if (conn->ibc_credits == 0) {   /* no credits */
908                         CDEBUG(D_NET, LPX64": no credits\n",
909                                conn->ibc_peer->ibp_nid);
910                         break;
911                 }
912                 
913                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
914                     conn->ibc_outstanding_credits == 0) { /* giving back credits */
915                         CDEBUG(D_NET, LPX64": not using last credit\n",
916                                conn->ibc_peer->ibp_nid);
917                         break;
918                 }
919                 
920                 list_del (&tx->tx_list);
921
922                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
923                     (!list_empty(&conn->ibc_tx_queue) ||
924                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
925                         /* redundant NOOP */
926                         spin_unlock(&conn->ibc_lock);
927                         kibnal_tx_done(tx);
928                         spin_lock(&conn->ibc_lock);
929                         CDEBUG(D_NET, LPX64": redundant noop\n",
930                                conn->ibc_peer->ibp_nid);
931                         continue;
932                 }
933
934                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
935                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
936
937                 conn->ibc_outstanding_credits = 0;
938                 conn->ibc_nsends_posted++;
939                 conn->ibc_credits--;
940
941                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
942                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
943                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
944                  * and then re-queued here.  It's (just) possible that
945                  * tx_sending is non-zero if we've not done the tx_complete() from
946                  * the first send; hence the += rather than = below. */
947                 tx->tx_sending++;
948
949                 list_add (&tx->tx_list, &conn->ibc_active_txs);
950
951                 /* Keep holding ibc_lock while posting sends on this
952                  * connection; vv_post_send() isn't re-entrant on the same
953                  * QP!! */
954
955                 LASSERT (tx->tx_nwrq > 0);
956
957                 rc = -ECONNABORTED;
958                 vvrc = vv_return_ok;
959                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
960                         tx->tx_status = 0;
961 #if 1
962                         vvrc = vv_post_send_list(kibnal_data.kib_hca,
963                                                  conn->ibc_qp,
964                                                  tx->tx_nwrq,
965                                                  tx->tx_wrq,
966                                                  vv_operation_type_send_rc);
967                         rc = (vvrc == vv_return_ok) ? 0 : -EIO;
968 #else
969                         /* Only post 1 item at a time for now (so we know
970                          * exactly how many got posted successfully) */
971                         for (i = 0; i < tx->tx_nwrq; i++) {
972                                 switch (tx->tx_wrq[i].wr_type) {
973                                 case vv_wr_send:
974                                         CDEBUG(D_NET, "[%d]posting send [%d %x %p]%s: %x\n", 
975                                                i,
976                                                tx->tx_wrq[i].scatgat_list->length,
977                                                tx->tx_wrq[i].scatgat_list->l_key,
978                                                tx->tx_wrq[i].scatgat_list->v_address,
979                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.fance_indicator ?
980                                                "(fence)":"",
981                                                tx->tx_msg->ibm_type);
982                                         break;
983                                 case vv_wr_rdma_write:
984                                         CDEBUG(D_NET, "[%d]posting PUT  [%d %x %p]->[%x "LPX64"]\n", 
985                                                i,
986                                                tx->tx_wrq[i].scatgat_list->length,
987                                                tx->tx_wrq[i].scatgat_list->l_key,
988                                                tx->tx_wrq[i].scatgat_list->v_address,
989                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_r_key,
990                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_addr);
991                                         break;
992                                 case vv_wr_rdma_read:
993                                         CDEBUG(D_NET, "[%d]posting GET  [%d %x %p]->[%x "LPX64"]\n", 
994                                                i,
995                                                tx->tx_wrq[i].scatgat_list->length,
996                                                tx->tx_wrq[i].scatgat_list->l_key,
997                                                tx->tx_wrq[i].scatgat_list->v_address,
998                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_r_key,
999                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_addr);
1000                                         break;
1001                                 default:
1002                                         LBUG();
1003                                 }
1004                                 vvrc = vv_post_send(kibnal_data.kib_hca,
1005                                                     conn->ibc_qp, 
1006                                                     &tx->tx_wrq[i], 
1007                                                     vv_operation_type_send_rc);
1008                                 CDEBUG(D_NET, LPX64": post %d/%d\n",
1009                                        conn->ibc_peer->ibp_nid, i, tx->tx_nwrq);
1010                                 if (vvrc != vv_return_ok) {
1011                                         rc = -EIO;
1012                                         break;
1013                                 }
1014                         }
1015 #endif
1016                 }
1017
1018                 if (rc != 0) {
1019                         /* NB credits are transferred in the actual
1020                          * message, which can only be the last work item */
1021                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
1022                         conn->ibc_credits++;
1023                         conn->ibc_nsends_posted--;
1024
1025                         tx->tx_status = rc;
1026                         tx->tx_waiting = 0;
1027                         tx->tx_sending--;
1028                         
1029                         done = (tx->tx_sending == 0);
1030                         if (done)
1031                                 list_del (&tx->tx_list);
1032                         
1033                         spin_unlock(&conn->ibc_lock);
1034                         
1035                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1036                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
1037                                         vvrc, conn->ibc_peer->ibp_nid);
1038                         else
1039                                 CDEBUG (D_NET, "Error %d posting transmit to "
1040                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1041
1042                         kibnal_close_conn (conn, rc);
1043
1044                         if (done)
1045                                 kibnal_tx_done (tx);
1046                         return;
1047                 }
1048         }
1049
1050         spin_unlock(&conn->ibc_lock);
1051 }
1052
1053 void
1054 kibnal_tx_complete (kib_tx_t *tx, int final_send, vv_comp_status_t vvrc)
1055 {
1056         kib_tx_t     *tx = (kib_tx_t *)((unsigned long)wc->wr_id);
1057         kib_conn_t   *conn = tx->tx_conn;
1058         int           failed = (vvrc != vv_comp_status_success);
1059         int           idle;
1060
1061         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
1062                tx->tx_sending, tx->tx_nwrq, wc->completion_status);
1063         LASSERT (tx->tx_sending != 0);
1064
1065         if (failed &&
1066             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1067                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
1068                         conn->ibc_peer->ibp_nid, wc->completion_status);
1069
1070         /* I should only get RDMA notifications of errors */
1071         LASSERT (final_send || failed);
1072
1073         spin_lock(&conn->ibc_lock);
1074
1075         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1076          * gets to free it, which also drops its ref on 'conn'. */
1077
1078         if (final_send)                         /* this is the last work item */
1079                 tx->tx_sending--;
1080
1081         if (failed) {
1082                 tx->tx_waiting = 0;
1083                 tx->tx_status = -EIO;
1084         }
1085         
1086         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1087                !tx->tx_waiting;                 /* Not waiting for peer */
1088         if (idle)
1089                 list_del(&tx->tx_list);
1090
1091         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1092
1093         if (tx->tx_sending == 0)
1094                 conn->ibc_nsends_posted--;
1095
1096         spin_unlock(&conn->ibc_lock);
1097
1098         if (idle)
1099                 kibnal_tx_done (tx);
1100
1101         if (failed)
1102                 kibnal_close_conn (conn, -EIO);
1103         else
1104                 kibnal_check_sends(conn);
1105
1106         kibnal_conn_decref(conn);               /* ...until here */
1107 }
1108
1109 void
1110 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1111 {
1112         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];
1113         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];
1114         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1115
1116         LASSERT (tx->tx_nwrq >= 0 && 
1117                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1118         LASSERT (nob <= IBNAL_MSG_SIZE);
1119
1120         kibnal_init_msg(tx->tx_msg, type, body_nob);
1121
1122         *gl = (vv_scatgat_t) {
1123                 .v_address = (void *)((unsigned long)KIBNAL_TX_VADDR(tx)),
1124                 .l_key     = KIBNAL_TX_LKEY(tx),
1125                 .length    = nob,
1126         };
1127
1128         memset(wrq, 0, sizeof(*wrq));
1129
1130         wrq->wr_id = (unsigned long)tx;
1131         wrq->wr_type = vv_wr_send;
1132         wrq->scatgat_list = gl;
1133         wrq->num_of_data_segments = 1;
1134         wrq->completion_notification = 1;
1135         wrq->type.send.solicited_event = 1;
1136         wrq->type.send.immidiate_data_indicator = 0;
1137         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1138         
1139         tx->tx_nwrq++;
1140 }
1141
1142 int
1143 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1144                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1145 {
1146         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1147         int              resid = nob;
1148         kib_msg_t       *ibmsg = tx->tx_msg;
1149         kib_rdma_desc_t *srcrd = tx->tx_rd;
1150         kib_rdma_frag_t *srcfrag;
1151         int              srcidx;
1152         kib_rdma_frag_t *dstfrag;
1153         int              dstidx;
1154         vv_scatgat_t    *gl;
1155         vv_wr_t         *wrq;
1156         int              wrknob;
1157         int              rc;
1158
1159         /* Called by scheduler */
1160         LASSERT (!in_interrupt());
1161
1162         LASSERT (type == IBNAL_MSG_GET_DONE ||
1163                  type == IBNAL_MSG_PUT_DONE);
1164
1165         srcidx = dstidx = 0;
1166         srcfrag = &srcrd->rd_frags[0];
1167         dstfrag = &dstrd->rd_frags[0];
1168         rc = resid;
1169
1170         while (resid > 0) {
1171                 if (srcidx >= srcrd->rd_nfrag) {
1172                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1173                         rc = -EPROTO;
1174                         break;
1175                 }
1176                 
1177                 if (dstidx == dstrd->rd_nfrag) {
1178                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1179                         rc = -EPROTO;
1180                         break;
1181                 }
1182
1183                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1184                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1185                                srcidx, srcrd->rd_nfrag,
1186                                dstidx, dstrd->rd_nfrag);
1187                         rc = -EMSGSIZE;
1188                         break;
1189                 }
1190
1191                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1192
1193                 gl = &tx->tx_gl[tx->tx_nwrq];
1194                 gl->v_address = (void *)((unsigned long)kibnal_rf_addr(srcfrag));
1195                 gl->length    = wrknob;
1196                 gl->l_key     = srcrd->rd_key;
1197
1198                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1199                 wrq->wr_id = (unsigned long)tx;
1200                 /* All frags give completion until we've sussed how to submit
1201                  * all frags + completion message and only (but reliably) get
1202                  * notification on the completion message */
1203                 wrq->completion_notification = 0;
1204                 wrq->scatgat_list = gl;
1205                 wrq->num_of_data_segments = 1;
1206                 wrq->wr_type = vv_wr_rdma_write;
1207                 wrq->type.send.solicited_event = 0;
1208                 wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1209                 wrq->type.send.send_qp_type.rc_type.r_addr = kibnal_rf_addr(dstfrag);
1210                 wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1211
1212                 resid -= wrknob;
1213                 if (wrknob < srcfrag->rf_nob) {
1214                         kibnal_rf_set(srcfrag, 
1215                                       kibnal_rf_addr(srcfrag) + resid, 
1216                                       srcfrag->rf_nob - wrknob);
1217                 } else {
1218                         srcfrag++;
1219                         srcidx++;
1220                 }
1221                 
1222                 if (wrknob < dstfrag->rf_nob) {
1223                         kibnal_rf_set(dstfrag,
1224                                       kibnal_rf_addr(dstfrag) + resid,
1225                                       dstfrag->rf_nob - wrknob);
1226                 } else {
1227                         dstfrag++;
1228                         dstidx++;
1229                 }
1230                 
1231                 tx->tx_nwrq++;
1232         }
1233
1234         if (rc < 0)                             /* no RDMA if completing with failure */
1235                 tx->tx_nwrq = 0;
1236         
1237         ibmsg->ibm_u.completion.ibcm_status = rc;
1238         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1239         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1240
1241         return rc;
1242 }
1243
1244 void
1245 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1246 {
1247         spin_lock(&conn->ibc_lock);
1248         kibnal_queue_tx_locked (tx, conn);
1249         spin_unlock(&conn->ibc_lock);
1250         
1251         kibnal_check_sends(conn);
1252 }
1253
1254 void
1255 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1256 {
1257         kib_peer_t      *peer;
1258         kib_conn_t      *conn;
1259         unsigned long    flags;
1260         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1261
1262         /* If I get here, I've committed to send, so I complete the tx with
1263          * failure on any problems */
1264         
1265         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1266         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1267
1268         read_lock_irqsave(g_lock, flags);
1269         
1270         peer = kibnal_find_peer_locked (nid);
1271         if (peer == NULL) {
1272                 read_unlock_irqrestore(g_lock, flags);
1273                 tx->tx_status = -EHOSTUNREACH;
1274                 kibnal_tx_done (tx);
1275                 return;
1276         }
1277
1278         conn = kibnal_find_conn_locked (peer);
1279         if (conn != NULL) {
1280                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1281                 read_unlock_irqrestore(g_lock, flags);
1282                 
1283                 kibnal_queue_tx (tx, conn);
1284                 kibnal_conn_decref(conn);       /* ...to here */
1285                 return;
1286         }
1287         
1288         /* Making one or more connections; I'll need a write lock... */
1289         read_unlock(g_lock);
1290         write_lock(g_lock);
1291
1292         peer = kibnal_find_peer_locked (nid);
1293         if (peer == NULL) {
1294                 write_unlock_irqrestore(g_lock, flags);
1295                 tx->tx_status = -EHOSTUNREACH;
1296                 kibnal_tx_done (tx);
1297                 return;
1298         }
1299
1300         conn = kibnal_find_conn_locked (peer);
1301         if (conn != NULL) {
1302                 /* Connection exists; queue message on it */
1303                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1304                 write_unlock_irqrestore(g_lock, flags);
1305                 
1306                 kibnal_queue_tx (tx, conn);
1307                 kibnal_conn_decref(conn);       /* ...until here */
1308                 return;
1309         }
1310
1311         if (peer->ibp_connecting == 0) {
1312                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1313                         write_unlock_irqrestore(g_lock, flags);
1314                         tx->tx_status = -EHOSTUNREACH;
1315                         kibnal_tx_done (tx);
1316                         return;
1317                 }
1318         
1319                 peer->ibp_connecting = 1;
1320                 kibnal_peer_addref(peer); /* extra ref for connd */
1321         
1322                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1323         
1324                 list_add_tail (&peer->ibp_connd_list,
1325                                &kibnal_data.kib_connd_peers);
1326                 wake_up (&kibnal_data.kib_connd_waitq);
1327         
1328                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1329         }
1330         
1331         /* A connection is being established; queue the message... */
1332         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1333
1334         write_unlock_irqrestore(g_lock, flags);
1335 }
1336
1337 int
1338 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
1339 {
1340         /* I would guess that if kibnal_get_peer (nid) == NULL,
1341            and we're not routing, then 'nid' is very distant :) */
1342         if ( nal->libnal_ni.ni_pid.nid == nid ) {
1343                 *dist = 0;
1344         } else {
1345                 *dist = 1;
1346         }
1347
1348         return 0;
1349 }
1350
1351 ptl_err_t
1352 kibnal_sendmsg(lib_nal_t    *nal, 
1353                void         *private,
1354                lib_msg_t    *libmsg,
1355                ptl_hdr_t    *hdr, 
1356                int           type, 
1357                ptl_nid_t     nid, 
1358                ptl_pid_t     pid,
1359                unsigned int  payload_niov, 
1360                struct iovec *payload_iov, 
1361                ptl_kiov_t   *payload_kiov,
1362                int           payload_offset,
1363                int           payload_nob)
1364 {
1365         kib_msg_t  *ibmsg;
1366         kib_tx_t   *tx;
1367         int         nob;
1368         int         rc;
1369         int         n;
1370
1371         /* NB 'private' is different depending on what we're sending.... */
1372
1373         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64
1374                " pid %d\n", payload_nob, payload_niov, nid , pid);
1375
1376         LASSERT (payload_nob == 0 || payload_niov > 0);
1377         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1378
1379         /* Thread context */
1380         LASSERT (!in_interrupt());
1381         /* payload is either all vaddrs or all pages */
1382         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1383
1384         switch (type) {
1385         default:
1386                 LBUG();
1387                 return (PTL_FAIL);
1388                 
1389         case PTL_MSG_REPLY: {
1390                 /* reply's 'private' is the incoming receive */
1391                 kib_rx_t *rx = private;
1392
1393                 LASSERT(rx != NULL);
1394
1395                 if (rx->rx_msg->ibm_type == IBNAL_MSG_IMMEDIATE) {
1396                         /* RDMA not expected */
1397                         nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1398                         if (nob > IBNAL_MSG_SIZE) {
1399                                 CERROR("REPLY for "LPX64" too big (RDMA not requested):"
1400                                        "%d (max for message is %d)\n", 
1401                                        nid, payload_nob, IBNAL_MSG_SIZE);
1402                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
1403                                        nob, nid);
1404                                 return PTL_FAIL;
1405                         }
1406                         break;
1407                 }
1408
1409                 /* Incoming message consistent with RDMA? */
1410                 if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
1411                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
1412                                nid, rx->rx_msg->ibm_type);
1413                         return PTL_FAIL;
1414                 }
1415
1416                 /* NB rx_complete() will send GET_NAK when I return to it from
1417                  * here, unless I set rx_responded! */
1418
1419                 tx = kibnal_get_idle_tx(0);
1420                 if (tx == NULL) {
1421                         CERROR("Can't get tx for REPLY to "LPX64"\n", nid);
1422                         return PTL_FAIL;
1423                 }
1424
1425                 if (payload_nob == 0)
1426                         rc = 0;
1427                 else if (payload_kiov == NULL)
1428                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
1429                                                  payload_niov, payload_iov, 
1430                                                  payload_offset, payload_nob);
1431                 else
1432                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1433                                                   payload_niov, payload_kiov,
1434                                                   payload_offset, payload_nob);
1435                 if (rc != 0) {
1436                         CERROR("Can't setup GET src for "LPX64": %d\n", nid, rc);
1437                         kibnal_tx_done(tx);
1438                         return PTL_FAIL;
1439                 }
1440                 
1441                 rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, payload_nob,
1442                                       &rx->rx_msg->ibm_u.get.ibgm_rd,
1443                                       rx->rx_msg->ibm_u.get.ibgm_cookie);
1444                 if (rc < 0) {
1445                         CERROR("Can't setup rdma for GET from "LPX64": %d\n", 
1446                                nid, rc);
1447                 } else if (rc == 0) {
1448                         /* No RDMA: local completion may happen now! */
1449                         lib_finalize (&kibnal_lib, NULL, libmsg, PTL_OK);
1450                 } else {
1451                         /* RDMA: lib_finalize(libmsg) when it completes */
1452                         tx->tx_libmsg[0] = libmsg;
1453                 }
1454
1455                 kibnal_queue_tx(tx, rx->rx_conn);
1456                 rx->rx_responded = 1;
1457                 return (rc >= 0) ? PTL_OK : PTL_FAIL;
1458         }
1459
1460         case PTL_MSG_GET:
1461                 /* will the REPLY message be small enough not to need RDMA? */
1462                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1463                 if (nob <= IBNAL_MSG_SIZE)
1464                         break;
1465
1466                 tx = kibnal_get_idle_tx(1);     /* may block; caller is an app thread */
1467                 LASSERT (tx != NULL);
1468
1469                 ibmsg = tx->tx_msg;
1470                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1471                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1472
1473                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
1474                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1475                                                  vv_acc_r_mem_write,
1476                                                  libmsg->md->md_niov,
1477                                                  libmsg->md->md_iov.iov,
1478                                                  0, libmsg->md->length);
1479                 else
1480                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1481                                                   vv_acc_r_mem_write,
1482                                                   libmsg->md->md_niov,
1483                                                   libmsg->md->md_iov.kiov,
1484                                                   0, libmsg->md->length);
1485                 if (rc != 0) {
1486                         CERROR("Can't setup GET sink for "LPX64": %d\n", nid, rc);
1487                         kibnal_tx_done(tx);
1488                         return PTL_FAIL;
1489                 }
1490
1491                 n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1492                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1493                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1494
1495                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, nid, libmsg);
1496                 if (tx->tx_libmsg[1] == NULL) {
1497                         CERROR("Can't create reply for GET -> "LPX64"\n", nid);
1498                         kibnal_tx_done(tx);
1499                         return PTL_FAIL;
1500                 }
1501
1502                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg[0,1] on completion */
1503                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1504                 kibnal_launch_tx(tx, nid);
1505                 return PTL_OK;
1506
1507         case PTL_MSG_ACK:
1508                 LASSERT (payload_nob == 0);
1509                 break;
1510
1511         case PTL_MSG_PUT:
1512                 /* Is the payload small enough not to need RDMA? */
1513                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1514                 if (nob <= IBNAL_MSG_SIZE)
1515                         break;
1516
1517                 tx = kibnal_get_idle_tx(1);     /* may block: caller is app thread */
1518                 LASSERT (tx != NULL);
1519
1520                 if (payload_kiov == NULL)
1521                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
1522                                                  payload_niov, payload_iov,
1523                                                  payload_offset, payload_nob);
1524                 else
1525                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1526                                                   payload_niov, payload_kiov,
1527                                                   payload_offset, payload_nob);
1528                 if (rc != 0) {
1529                         CERROR("Can't setup PUT src for "LPX64": %d\n", nid, rc);
1530                         kibnal_tx_done(tx);
1531                         return PTL_FAIL;
1532                 }
1533
1534                 ibmsg = tx->tx_msg;
1535                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1536                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1537                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1538
1539                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1540                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1541                 kibnal_launch_tx(tx, nid);
1542                 return PTL_OK;
1543         }
1544
1545         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1546                  <= IBNAL_MSG_SIZE);
1547
1548         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1549                                   type == PTL_MSG_REPLY));
1550         if (tx == NULL) {
1551                 CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", type, nid);
1552                 return PTL_NO_SPACE;
1553         }
1554
1555         ibmsg = tx->tx_msg;
1556         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1557
1558         if (payload_nob > 0) {
1559                 if (payload_kiov != NULL)
1560                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1561                                           payload_niov, payload_kiov,
1562                                           payload_offset, payload_nob);
1563                 else
1564                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1565                                          payload_niov, payload_iov,
1566                                          payload_offset, payload_nob);
1567         }
1568
1569         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1570         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1571
1572         tx->tx_libmsg[0] = libmsg;              /* finalise libmsg on completion */
1573         kibnal_launch_tx(tx, nid);
1574         return PTL_OK;
1575 }
1576
1577 ptl_err_t
1578 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1579                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1580                unsigned int payload_niov, struct iovec *payload_iov,
1581                size_t payload_offset, size_t payload_len)
1582 {
1583         CDEBUG(D_NET, "  pid = %d, nid="LPU64"\n",
1584                pid, nid);
1585         return (kibnal_sendmsg(nal, private, cookie,
1586                                hdr, type, nid, pid,
1587                                payload_niov, payload_iov, NULL,
1588                                payload_offset, payload_len));
1589 }
1590
1591 ptl_err_t
1592 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1593                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1594                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1595                      size_t payload_offset, size_t payload_len)
1596 {
1597         return (kibnal_sendmsg(nal, private, cookie,
1598                                hdr, type, nid, pid,
1599                                payload_niov, NULL, payload_kiov,
1600                                payload_offset, payload_len));
1601 }
1602
1603 ptl_err_t
1604 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1605                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1606                  size_t offset, int mlen, int rlen)
1607 {
1608         kib_rx_t    *rx = private;
1609         kib_msg_t   *rxmsg = rx->rx_msg;
1610         kib_conn_t  *conn = rx->rx_conn;
1611         kib_tx_t    *tx;
1612         kib_msg_t   *txmsg;
1613         int          nob;
1614         int          rc;
1615         int          n;
1616         
1617         LASSERT (mlen <= rlen);
1618         LASSERT (mlen >= 0);
1619         LASSERT (!in_interrupt());
1620         /* Either all pages or all vaddrs */
1621         LASSERT (!(kiov != NULL && iov != NULL));
1622
1623         switch (rxmsg->ibm_type) {
1624         default:
1625                 LBUG();
1626                 
1627         case IBNAL_MSG_IMMEDIATE:
1628                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1629                 if (nob > IBNAL_MSG_SIZE) {
1630                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1631                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1632                         return (PTL_FAIL);
1633                 }
1634
1635                 if (kiov != NULL)
1636                         lib_copy_buf2kiov(niov, kiov, offset,
1637                                           rxmsg->ibm_u.immediate.ibim_payload,
1638                                           mlen);
1639                 else
1640                         lib_copy_buf2iov(niov, iov, offset,
1641                                          rxmsg->ibm_u.immediate.ibim_payload,
1642                                          mlen);
1643
1644                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1645                 return (PTL_OK);
1646
1647         case IBNAL_MSG_PUT_REQ:
1648                 /* NB rx_complete() will send PUT_NAK when I return to it from
1649                  * here, unless I set rx_responded!  */
1650
1651                 if (mlen == 0) { /* No payload to RDMA */
1652                         lib_finalize(nal, NULL, libmsg, PTL_OK);
1653                         return PTL_OK;
1654                 }
1655
1656                 tx = kibnal_get_idle_tx(0);
1657                 if (tx == NULL) {
1658                         CERROR("Can't allocate tx for "LPX64"\n",
1659                                conn->ibc_peer->ibp_nid);
1660                         return PTL_FAIL;
1661                 }
1662
1663                 txmsg = tx->tx_msg;
1664                 if (kiov == NULL)
1665                         rc = kibnal_setup_rd_iov(tx, 
1666                                                  &txmsg->ibm_u.putack.ibpam_rd,
1667                                                  vv_acc_r_mem_write,
1668                                                  niov, iov, offset, mlen);
1669                 else
1670                         rc = kibnal_setup_rd_kiov(tx,
1671                                                   &txmsg->ibm_u.putack.ibpam_rd,
1672                                                   vv_acc_r_mem_write,
1673                                                   niov, kiov, offset, mlen);
1674                 if (rc != 0) {
1675                         CERROR("Can't setup PUT sink for "LPX64": %d\n",
1676                                conn->ibc_peer->ibp_nid, rc);
1677                         kibnal_tx_done(tx);
1678                         return PTL_FAIL;
1679                 }
1680
1681                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1682                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1683
1684                 n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1685                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1686                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1687
1688                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1689                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1690                 kibnal_queue_tx(tx, conn);
1691
1692                 LASSERT (!rx->rx_responded);
1693                 rx->rx_responded = 1;
1694                 return PTL_OK;
1695
1696         case IBNAL_MSG_GET_REQ:
1697                 /* We get called here just to discard any junk after the
1698                  * GET hdr. */
1699                 LASSERT (libmsg == NULL);
1700                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1701                 return (PTL_OK);
1702         }
1703 }
1704
1705 ptl_err_t
1706 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1707               unsigned int niov, struct iovec *iov, 
1708               size_t offset, size_t mlen, size_t rlen)
1709 {
1710         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1711                                 offset, mlen, rlen));
1712 }
1713
1714 ptl_err_t
1715 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1716                      unsigned int niov, ptl_kiov_t *kiov, 
1717                      size_t offset, size_t mlen, size_t rlen)
1718 {
1719         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1720                                 offset, mlen, rlen));
1721 }
1722
1723 int
1724 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1725 {
1726         long    pid = kernel_thread (fn, arg, 0);
1727
1728         if (pid < 0)
1729                 return ((int)pid);
1730
1731         atomic_inc (&kibnal_data.kib_nthreads);
1732         return (0);
1733 }
1734
1735 void
1736 kibnal_thread_fini (void)
1737 {
1738         atomic_dec (&kibnal_data.kib_nthreads);
1739 }
1740
1741 void
1742 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1743 {
1744         /* This just does the immmediate housekeeping.  'error' is zero for a
1745          * normal shutdown which can happen only after the connection has been
1746          * established.  If the connection is established, schedule the
1747          * connection to be finished off by the connd.  Otherwise the connd is
1748          * already dealing with it (either to set it up or tear it down).
1749          * Caller holds kib_global_lock exclusively in irq context */
1750         kib_peer_t   *peer = conn->ibc_peer;
1751
1752         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1753
1754         if (error != 0 && conn->ibc_comms_error == 0)
1755                 conn->ibc_comms_error = error;
1756
1757         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1758                 return; /* already being handled  */
1759
1760         CDEBUG (error == 0 ? D_NET : D_ERROR,
1761                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1762
1763         /* kib_connd_conns takes ibc_list's ref */
1764         list_del (&conn->ibc_list);
1765         
1766         if (list_empty (&peer->ibp_conns) &&
1767             peer->ibp_persistence == 0) {
1768                 /* Non-persistent peer with no more conns... */
1769                 kibnal_unlink_peer_locked (peer);
1770         }
1771
1772         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECT1);
1773
1774         spin_lock(&kibnal_data.kib_connd_lock);
1775
1776         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1777         wake_up (&kibnal_data.kib_connd_waitq);
1778                 
1779         spin_unlock(&kibnal_data.kib_connd_lock);
1780 }
1781
1782 void
1783 kibnal_close_conn (kib_conn_t *conn, int error)
1784 {
1785         unsigned long flags;
1786         
1787         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1788
1789         kibnal_close_conn_locked (conn, error);
1790         
1791         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1792 }
1793
1794 void
1795 kibnal_handle_early_rxs(kib_conn_t *conn)
1796 {
1797         unsigned long    flags;
1798         kib_rx_t        *rx;
1799
1800         LASSERT (!in_interrupt());
1801         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1802         
1803         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1804         while (!list_empty(&conn->ibc_early_rxs)) {
1805                 rx = list_entry(conn->ibc_early_rxs.next,
1806                                 kib_rx_t, rx_list);
1807                 list_del(&rx->rx_list);
1808                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1809                 
1810                 kibnal_handle_rx(rx);
1811                 
1812                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1813         }
1814         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1815 }
1816
1817 void
1818 kibnal_conn_disconnected(kib_conn_t *conn)
1819 {
1820         LIST_HEAD        (zombies); 
1821         struct list_head *tmp;
1822         struct list_head *nxt;
1823         kib_tx_t         *tx;
1824
1825         /* I'm the connd */
1826         LASSERT (!in_interrupt());
1827         LASSERT (current == kibnal_data.kib_connd);
1828         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
1829         
1830         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
1831
1832         /* move QP to error state to make posted work items complete */
1833         kibnal_set_qp_state(conn, vv_qp_state_error);
1834
1835         spin_lock(&conn->ibc_lock);
1836
1837         /* Complete all tx descs not waiting for sends to complete.
1838          * NB we should be safe from RDMA now that the QP has changed state */
1839
1840         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1841                 tx = list_entry (tmp, kib_tx_t, tx_list);
1842
1843                 tx->tx_status = -ECONNABORTED;
1844                 tx->tx_waiting = 0;
1845                 
1846                 if (tx->tx_sending != 0)
1847                         continue;
1848
1849                 list_del (&tx->tx_list);
1850                 list_add (&tx->tx_list, &zombies);
1851         }
1852
1853         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1854                 tx = list_entry (tmp, kib_tx_t, tx_list);
1855
1856                 LASSERT (tx->tx_waiting ||
1857                          tx->tx_sending != 0);
1858
1859                 tx->tx_status = -ECONNABORTED;
1860                 tx->tx_waiting = 0;
1861                 
1862                 if (tx->tx_sending != 0)
1863                         continue;
1864
1865                 list_del (&tx->tx_list);
1866                 list_add (&tx->tx_list, &zombies);
1867         }
1868         
1869         spin_unlock(&conn->ibc_lock);
1870
1871         while (!list_empty(&zombies)) {
1872                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1873
1874                 list_del(&tx->tx_list);
1875                 kibnal_tx_done (tx);
1876         }
1877
1878         kibnal_handle_early_rxs(conn);
1879 }
1880
1881 void
1882 kibnal_peer_connect_failed (kib_peer_t *peer, int active)
1883 {
1884         struct list_head  zombies;
1885         kib_tx_t         *tx;
1886         unsigned long     flags;
1887
1888         /* Only the connd creates conns => single threaded */
1889         LASSERT (!in_interrupt());
1890         LASSERT (current == kibnal_data.kib_connd);
1891         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1892
1893         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1894
1895         if (active) {
1896                 LASSERT (peer->ibp_connecting != 0);
1897                 peer->ibp_connecting--;
1898         } else {
1899                 LASSERT (!kibnal_peer_active(peer));
1900         }
1901         
1902         if (peer->ibp_connecting != 0) {
1903                 /* another connection attempt under way (loopback?)... */
1904                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1905                 return;
1906         }
1907
1908         if (list_empty(&peer->ibp_conns)) {
1909                 /* Say when active connection can be re-attempted */
1910                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1911                 /* Increase reconnection interval */
1912                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1913                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1914         
1915                 /* Take peer's blocked transmits to complete with error */
1916                 list_add(&zombies, &peer->ibp_tx_queue);
1917                 list_del_init(&peer->ibp_tx_queue);
1918                 
1919                 if (kibnal_peer_active(peer) &&
1920                     (peer->ibp_persistence == 0)) {
1921                         /* failed connection attempt on non-persistent peer */
1922                         kibnal_unlink_peer_locked (peer);
1923                 }
1924         } else {
1925                 /* Can't have blocked transmits if there are connections */
1926                 LASSERT (list_empty(&peer->ibp_tx_queue));
1927         }
1928         
1929         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1930
1931         if (list_empty (&zombies)) 
1932                 return;
1933         
1934         CERROR ("Deleting messages for "LPX64": connection failed\n", peer->ibp_nid);
1935         do {
1936                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1937
1938                 list_del (&tx->tx_list);
1939                 /* complete now */
1940                 tx->tx_status = -EHOSTUNREACH;
1941                 kibnal_tx_done (tx);
1942         } while (!list_empty (&zombies));
1943 }
1944
1945 void
1946 kibnal_connreq_done(kib_conn_t *conn, int active, int status)
1947 {
1948         static cm_reject_data_t   rej;
1949
1950         struct list_head   txs;
1951         kib_peer_t        *peer = conn->ibc_peer;
1952         kib_peer_t        *peer2;
1953         unsigned long      flags;
1954         kib_tx_t          *tx;
1955
1956         /* Only the connd creates conns => single threaded */
1957         LASSERT (!in_interrupt());
1958         LASSERT (current == kibnal_data.kib_connd);
1959         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
1960
1961         if (active) {
1962                 LASSERT (peer->ibp_connecting > 0);
1963         } else {
1964                 LASSERT (!kibnal_peer_active(peer));
1965         }
1966         
1967         PORTAL_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1968         conn->ibc_connvars = NULL;
1969
1970         if (status != 0) {
1971                 /* failed to establish connection */
1972                 switch (conn->ibc_state) {
1973                 default:
1974                         LBUG();
1975                 case IBNAL_CONN_ACTIVE_CHECK_REPLY:
1976                         /* got a connection reply but failed checks */
1977                         LASSERT (active);
1978                         memset(&rej, 0, sizeof(rej));
1979                         rej.reason = cm_rej_code_usr_rej;
1980                         cm_reject(conn->ibc_cep, &rej);
1981                         break;
1982
1983                 case IBNAL_CONN_ACTIVE_CONNECT:
1984                         LASSERT (active);
1985                         cm_cancel(conn->ibc_cep);
1986                         kibnal_pause(HZ/10);
1987                         /* cm_connect() failed immediately or
1988                          * callback returned failure */
1989                         break;
1990
1991                 case IBNAL_CONN_ACTIVE_ARP:
1992                         LASSERT (active);
1993                         /* ibat_get_ib_data() failed immediately 
1994                          * or callback returned failure */
1995                         break;
1996
1997                 case IBNAL_CONN_INIT:
1998                         break;
1999
2000                 case IBNAL_CONN_PASSIVE_WAIT:
2001                         LASSERT (!active);
2002                         /* cm_accept callback returned failure */
2003                         break;
2004                 }
2005
2006                 kibnal_peer_connect_failed(conn->ibc_peer, active);
2007                 kibnal_conn_disconnected(conn);
2008                 return;
2009         }
2010
2011         /* connection established */
2012         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2013
2014         if (active) {
2015                 LASSERT(conn->ibc_state == IBNAL_CONN_ACTIVE_RTU);
2016         } else {
2017                 LASSERT(conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2018         }
2019         
2020         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2021
2022         if (!active) {
2023                 peer2 = kibnal_find_peer_locked(peer->ibp_nid);
2024                 if (peer2 != NULL) {
2025                         /* already in the peer table; swap */
2026                         conn->ibc_peer = peer2;
2027                         kibnal_peer_addref(peer2);
2028                         kibnal_peer_decref(peer);
2029                         peer = conn->ibc_peer;
2030                 } else {
2031                         /* add 'peer' to the peer table */
2032                         kibnal_peer_addref(peer);
2033                         list_add_tail(&peer->ibp_list,
2034                                       kibnal_nid2peerlist(peer->ibp_nid));
2035                 }
2036         }
2037         
2038         /* Add conn to peer's list and nuke any dangling conns from a different
2039          * peer instance... */
2040         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2041         list_add(&conn->ibc_list, &peer->ibp_conns);
2042         kibnal_close_stale_conns_locked (conn->ibc_peer,
2043                                          conn->ibc_incarnation);
2044
2045         if (!kibnal_peer_active(peer) ||        /* peer has been deleted */
2046             conn->ibc_comms_error != 0 ||       /* comms error */
2047             conn->ibc_disconnect) {             /* need to disconnect */
2048                 
2049                 /* start to shut down connection */
2050                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2051
2052                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2053                 kibnal_peer_connect_failed(peer, active);
2054                 return;
2055         }
2056
2057         if (active)
2058                 peer->ibp_connecting--;
2059
2060         /* grab pending txs while I have the lock */
2061         list_add(&txs, &peer->ibp_tx_queue);
2062         list_del_init(&peer->ibp_tx_queue);
2063         
2064         /* reset reconnect interval for next attempt */
2065         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
2066         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2067
2068         /* Schedule blocked txs */
2069         spin_lock (&conn->ibc_lock);
2070         while (!list_empty (&txs)) {
2071                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2072                 list_del (&tx->tx_list);
2073
2074                 kibnal_queue_tx_locked (tx, conn);
2075         }
2076         spin_unlock (&conn->ibc_lock);
2077         kibnal_check_sends (conn);
2078
2079         /* schedule blocked rxs */
2080         kibnal_handle_early_rxs(conn);
2081 }
2082
2083 void
2084 kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
2085 {
2086         static cm_dreply_data_t drep;           /* just zeroed space */
2087         
2088         kib_conn_t             *conn = (kib_conn_t *)arg;
2089         unsigned long           flags;
2090         
2091         /* CAVEAT EMPTOR: tasklet context */
2092
2093         switch (cmdata->status) {
2094         default:
2095                 LBUG();
2096                 
2097         case cm_event_disconn_request:
2098                 /* IBNAL_CONN_ACTIVE_RTU:  gets closed in kibnal_connreq_done
2099                  * IBNAL_CONN_ESTABLISHED: I start it closing
2100                  * otherwise:              it's closing anyway */
2101                 cm_disconnect(conn->ibc_cep, NULL, &drep);
2102                 cm_cancel(conn->ibc_cep);
2103
2104                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2105                 LASSERT (!conn->ibc_disconnect);
2106                 conn->ibc_disconnect = 1;
2107
2108                 switch (conn->ibc_state) {
2109                 default:
2110                         LBUG();
2111
2112                 case IBNAL_CONN_ACTIVE_RTU:
2113                         /* kibnal_connreq_done is getting there; It'll see
2114                          * ibc_disconnect set... */
2115                         kibnal_conn_decref(conn); /* lose my ref */
2116                         break;
2117
2118                 case IBNAL_CONN_ESTABLISHED:
2119                         /* kibnal_connreq_done got there already; get
2120                          * disconnect going... */
2121                         kibnal_close_conn_locked(conn, 0);
2122                         kibnal_conn_decref(conn); /* lose my ref */
2123                         break;
2124
2125                 case IBNAL_CONN_DISCONNECT1:
2126                         /* kibnal_terminate_conn is getting there; It'll see
2127                          * ibc_disconnect set... */
2128                         kibnal_conn_decref(conn); /* lose my ref */
2129                         break;
2130
2131                 case IBNAL_CONN_DISCONNECT2:
2132                         /* kibnal_terminate_conn got there already; complete
2133                          * the disconnect.  NB kib_connd_conns takes my ref */
2134                         spin_lock(&kibnal_data.kib_connd_lock);
2135                         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2136                         wake_up(&kibnal_data.kib_connd_waitq);
2137                         spin_unlock(&kibnal_data.kib_connd_lock);
2138                         break;
2139                 }
2140                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2141                 return;
2142                 
2143         case cm_event_disconn_timeout:
2144         case cm_event_disconn_reply:
2145                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2146                 LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT2);
2147                 LASSERT (!conn->ibc_disconnect);
2148                 conn->ibc_disconnect = 1;
2149
2150                 /* kibnal_terminate_conn sent the disconnect request.  
2151                  * NB kib_connd_conns takes my ref */
2152                 spin_lock(&kibnal_data.kib_connd_lock);
2153                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2154                 wake_up(&kibnal_data.kib_connd_waitq);
2155                 spin_unlock(&kibnal_data.kib_connd_lock);
2156
2157                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2158                 break;
2159                 
2160         case cm_event_connected:
2161         case cm_event_conn_timeout:
2162         case cm_event_conn_reject:
2163                 LASSERT (conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2164                 conn->ibc_connvars->cv_conndata = *cmdata;
2165                 
2166                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2167                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2168                 wake_up(&kibnal_data.kib_connd_waitq);
2169                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2170                 break;
2171         }
2172 }
2173
2174 void
2175 kibnal_check_passive_wait(kib_conn_t *conn)
2176 {
2177         int     rc;
2178
2179         switch (conn->ibc_connvars->cv_conndata.status) {
2180         default:
2181                 LBUG();
2182                 
2183         case cm_event_connected:
2184                 kibnal_conn_addref(conn); /* ++ ref for CM callback */
2185                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2186                 if (rc != 0)
2187                         conn->ibc_comms_error = rc;
2188                 /* connection _has_ been established; it's just that we've had
2189                  * an error immediately... */
2190                 kibnal_connreq_done(conn, 0, 0);
2191                 break;
2192                 
2193         case cm_event_conn_timeout:
2194                 kibnal_connreq_done(conn, 0, -ETIMEDOUT);
2195                 break;
2196                 
2197         case cm_event_conn_reject:
2198                 kibnal_connreq_done(conn, 0, -ECONNRESET);
2199                 break;
2200         }
2201 }
2202
2203 void
2204 kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
2205 {
2206         static cm_reply_data_t  reply;
2207         static cm_reject_data_t reject;
2208
2209         kib_msg_t          *rxmsg = (kib_msg_t *)cmreq->priv_data;
2210         kib_msg_t          *txmsg;
2211         kib_conn_t         *conn = NULL;
2212         int                 rc = 0;
2213         kib_connvars_t     *cv;
2214         kib_peer_t         *tmp_peer;
2215         cm_return_t         cmrc;
2216         vv_return_t         vvrc;
2217         
2218         /* I'm the connd executing in thread context
2219          * No concurrency problems with static data! */
2220         LASSERT (!in_interrupt());
2221         LASSERT (current == kibnal_data.kib_connd);
2222
2223         if (cmreq->sid != IBNAL_SERVICE_NUMBER) {
2224                 CERROR(LPX64" != IBNAL_SERVICE_NUMBER("LPX64")\n",
2225                        cmreq->sid, (__u64)IBNAL_SERVICE_NUMBER);
2226                 goto reject;
2227         }
2228
2229         rc = kibnal_unpack_msg(rxmsg, cm_REQ_priv_data_len);
2230         if (rc != 0) {
2231                 CERROR("Can't parse connection request: %d\n", rc);
2232                 goto reject;
2233         }
2234
2235         if (rxmsg->ibm_type != IBNAL_MSG_CONNREQ) {
2236                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
2237                        rxmsg->ibm_type, rxmsg->ibm_srcnid);
2238                 goto reject;
2239         }
2240
2241         if (rxmsg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) {
2242                 CERROR("Can't accept "LPX64": bad dst nid "LPX64"\n",
2243                        rxmsg->ibm_srcnid, rxmsg->ibm_dstnid);
2244                 goto reject;
2245         }
2246
2247         if (rxmsg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2248                 CERROR("Can't accept "LPX64": incompatible queue depth %d (%d wanted)\n",
2249                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_queue_depth, 
2250                        IBNAL_MSG_QUEUE_SIZE);
2251                 goto reject;
2252         }
2253
2254         if (rxmsg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2255                 CERROR("Can't accept "LPX64": message size %d too big (%d max)\n",
2256                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_msg_size, 
2257                        IBNAL_MSG_SIZE);
2258                 goto reject;
2259         }
2260                 
2261         if (rxmsg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2262                 CERROR("Can't accept "LPX64": max frags %d too big (%d max)\n",
2263                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_frags, 
2264                        IBNAL_MAX_RDMA_FRAGS);
2265                 goto reject;
2266         }
2267                 
2268         conn = kibnal_create_conn(cep);
2269         if (conn == NULL) {
2270                 CERROR("Can't create conn for "LPX64"\n", rxmsg->ibm_srcnid);
2271                 goto reject;
2272         }
2273         
2274         /* assume 'rxmsg->ibm_srcnid' is a new peer */
2275         tmp_peer = kibnal_create_peer (rxmsg->ibm_srcnid);
2276         if (tmp_peer == NULL) {
2277                 CERROR("Can't create tmp peer for "LPX64"\n", rxmsg->ibm_srcnid);
2278                 kibnal_conn_decref(conn);
2279                 conn = NULL;
2280                 goto reject;
2281         }
2282
2283         conn->ibc_peer = tmp_peer;              /* conn takes over my ref */
2284         conn->ibc_incarnation = rxmsg->ibm_srcstamp;
2285         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2286
2287         cv = conn->ibc_connvars;
2288
2289         cv->cv_txpsn          = cmreq->cep_data.start_psn;
2290         cv->cv_remote_qpn     = cmreq->cep_data.qpn;
2291         cv->cv_path           = cmreq->path_data.path;
2292         cv->cv_rnr_count      = cmreq->cep_data.rtr_retry_cnt;
2293         // XXX                  cmreq->cep_data.retry_cnt;
2294         cv->cv_port           = cmreq->cep_data.local_port_num;
2295
2296         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2297                              &cv->cv_path.sgid, &cv->cv_sgid_index);
2298         LASSERT (vvrc == vv_return_ok);
2299         
2300         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2301                                cv->cv_path.pkey, &cv->cv_pkey_index);
2302         LASSERT (vvrc == vv_return_ok);
2303
2304         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2305         if (rc != 0)
2306                 goto reject;
2307
2308         rc = kibnal_post_receives(conn);
2309         if (rc != 0) {
2310                 CERROR("Can't post receives for "LPX64"\n", rxmsg->ibm_srcnid);
2311                 goto reject;
2312         }
2313
2314         rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2315         if (rc != 0)
2316                 goto reject;
2317         
2318         memset(&reply, 0, sizeof(reply));
2319         reply.qpn                 = cv->cv_local_qpn;
2320         reply.qkey                = IBNAL_QKEY;
2321         reply.start_psn           = cv->cv_rxpsn;
2322         reply.arb_initiator_depth = IBNAL_ARB_INITIATOR_DEPTH;
2323         reply.arb_resp_res        = IBNAL_ARB_RESP_RES;
2324         reply.failover_accepted   = IBNAL_FAILOVER_ACCEPTED;
2325         reply.rnr_retry_count     = cv->cv_rnr_count;
2326         reply.targ_ack_delay      = kibnal_data.kib_hca_attrs.ack_delay;
2327         
2328         txmsg = (kib_msg_t *)&reply.priv_data;
2329         kibnal_init_msg(txmsg, IBNAL_MSG_CONNACK, 
2330                         sizeof(txmsg->ibm_u.connparams));
2331         LASSERT (txmsg->ibm_nob <= cm_REP_priv_data_len);
2332         txmsg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2333         txmsg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2334         txmsg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2335         kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp);
2336         
2337         kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
2338         
2339         cmrc = cm_accept(conn->ibc_cep, &reply, NULL,
2340                          kibnal_cm_callback, conn);
2341
2342         if (cmrc == cm_stat_success)
2343                 return;                         /* callback has got my ref on conn */
2344
2345         /* back out state change (no callback happening) */
2346         kibnal_set_conn_state(conn, IBNAL_CONN_INIT);
2347         rc = -EIO;
2348                 
2349  reject:
2350         CERROR("Rejected connreq from "LPX64"\n", rxmsg->ibm_srcnid);
2351
2352         memset(&reject, 0, sizeof(reject));
2353         reject.reason = cm_rej_code_usr_rej;
2354         cm_reject(cep, &reject);
2355
2356         if (conn != NULL) {
2357                 LASSERT (rc != 0);
2358                 kibnal_connreq_done(conn, 0, rc);
2359         } else {
2360                 cm_destroy_cep(cep);
2361         }
2362 }
2363
2364 void
2365 kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *data, void *arg)
2366 {
2367         cm_request_data_t  *cmreq = &data->data.request;
2368         kib_pcreq_t        *pcr;
2369         unsigned long       flags;
2370         
2371         LASSERT (arg == NULL);
2372
2373         if (data->status != cm_event_conn_request) {
2374                 CERROR("status %d is not cm_event_conn_request\n",
2375                        data->status);
2376                 return;
2377         }
2378
2379         PORTAL_ALLOC_ATOMIC(pcr, sizeof(*pcr));
2380         if (pcr == NULL) {
2381                 CERROR("Can't allocate passive connreq\n");
2382
2383                 cm_reject(cep, &((cm_reject_data_t) /* NB RO struct */
2384                                  {.reason = cm_rej_code_no_res,}));
2385                 cm_destroy_cep(cep);
2386                 return;
2387         }
2388
2389         pcr->pcr_cep = cep;
2390         pcr->pcr_cmreq = *cmreq;
2391         
2392         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2393
2394         list_add_tail(&pcr->pcr_list, &kibnal_data.kib_connd_pcreqs);
2395         wake_up(&kibnal_data.kib_connd_waitq);
2396         
2397         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2398 }
2399
2400
2401 void
2402 kibnal_active_connect_callback (cm_cep_handle_t cep, cm_conn_data_t *cd, 
2403                                 void *arg)
2404 {
2405         /* CAVEAT EMPTOR: tasklet context */
2406         kib_conn_t       *conn = (kib_conn_t *)arg;
2407         kib_connvars_t   *cv = conn->ibc_connvars;
2408         unsigned long     flags;
2409
2410         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2411         cv->cv_conndata = *cd;
2412
2413         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2414         /* connd takes my ref */
2415         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2416         wake_up(&kibnal_data.kib_connd_waitq);
2417         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2418 }
2419
2420 void
2421 kibnal_connect_conn (kib_conn_t *conn)
2422 {
2423         static cm_request_data_t  cmreq;
2424         kib_msg_t                *msg = (kib_msg_t *)&cmreq.priv_data;
2425         kib_connvars_t           *cv = conn->ibc_connvars;
2426         kib_peer_t               *peer = conn->ibc_peer;
2427         cm_return_t               cmrc;
2428         
2429         /* Only called by connd => statics OK */
2430         LASSERT (!in_interrupt());
2431         LASSERT (current == kibnal_data.kib_connd);
2432         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2433
2434         memset(&cmreq, 0, sizeof(cmreq));
2435         
2436         cmreq.sid = IBNAL_SERVICE_NUMBER;
2437
2438         cmreq.cep_data.ca_guid              = kibnal_data.kib_hca_attrs.guid;
2439         cmreq.cep_data.qpn                  = cv->cv_local_qpn;
2440         cmreq.cep_data.retry_cnt            = IBNAL_RETRY_CNT;
2441         cmreq.cep_data.rtr_retry_cnt        = IBNAL_RNR_CNT;
2442         cmreq.cep_data.start_psn            = cv->cv_rxpsn;
2443         cmreq.cep_data.end_to_end_flow_ctrl = IBNAL_EE_FLOW_CNT;
2444         // XXX ack_timeout?
2445         // offered_resp_res
2446         // offered_initiator_depth
2447
2448         cmreq.path_data.subn_local  = IBNAL_LOCAL_SUB;
2449         cmreq.path_data.path        = cv->cv_path;
2450         
2451         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2452         LASSERT(msg->ibm_nob <= cm_REQ_priv_data_len);
2453         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2454         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2455         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2456         kibnal_pack_msg(msg, 0, peer->ibp_nid, 0);
2457         
2458         CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
2459
2460         kibnal_conn_addref(conn);               /* ++ref for CM callback */
2461         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CONNECT);
2462
2463         cmrc = cm_connect(conn->ibc_cep, &cmreq, 
2464                           kibnal_active_connect_callback, conn);
2465         if (cmrc == cm_stat_success) {
2466                 CDEBUG(D_NET, "connection REQ sent to "LPX64"\n",
2467                        peer->ibp_nid);
2468                 return;
2469         }
2470
2471         CERROR ("Connect "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2472         kibnal_conn_decref(conn);       /* drop callback's ref */
2473         kibnal_connreq_done(conn, 1, -EHOSTUNREACH);
2474 }
2475
2476 void
2477 kibnal_check_connreply (kib_conn_t *conn)
2478 {
2479         static cm_rtu_data_t  rtu;
2480
2481         kib_connvars_t   *cv = conn->ibc_connvars;
2482         cm_reply_data_t  *reply = &cv->cv_conndata.data.reply;
2483         kib_msg_t        *msg = (kib_msg_t *)&reply->priv_data;
2484         kib_peer_t       *peer = conn->ibc_peer;
2485         cm_return_t       cmrc;
2486         cm_cep_handle_t   cep;
2487         unsigned long     flags;
2488         int               rc;
2489
2490         /* Only called by connd => statics OK */
2491         LASSERT (!in_interrupt());
2492         LASSERT (current == kibnal_data.kib_connd);
2493         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2494
2495         if (cv->cv_conndata.status == cm_event_conn_reply) {
2496                 cv->cv_remote_qpn = reply->qpn;
2497                 cv->cv_txpsn      = reply->start_psn;
2498                 // XXX              reply->targ_ack_delay;
2499                 cv->cv_rnr_count  = reply->rnr_retry_count;
2500
2501                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2502
2503                 rc = kibnal_unpack_msg(msg, cm_REP_priv_data_len);
2504                 if (rc != 0) {
2505                         CERROR("Can't unpack reply from "LPX64"\n",
2506                                peer->ibp_nid);
2507                         kibnal_connreq_done(conn, 1, rc);
2508                         return;
2509                 }
2510
2511                 if (msg->ibm_type != IBNAL_MSG_CONNACK ) {
2512                         CERROR("Unexpected message type %d from "LPX64"\n",
2513                                msg->ibm_type, peer->ibp_nid);
2514                         kibnal_connreq_done(conn, 1, -EPROTO);
2515                         return;
2516                 }
2517
2518                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2519                         CERROR(LPX64" has incompatible queue depth %d(%d wanted)\n",
2520                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_queue_depth,
2521                                IBNAL_MSG_QUEUE_SIZE);
2522                         kibnal_connreq_done(conn, 1, -EPROTO);
2523                         return;
2524                 }
2525                 
2526                 if (msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2527                         CERROR(LPX64" max message size %d too big (%d max)\n",
2528                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_msg_size, 
2529                                IBNAL_MSG_SIZE);
2530                         kibnal_connreq_done(conn, 1, -EPROTO);
2531                         return;
2532                 }
2533
2534                 if (msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2535                         CERROR(LPX64" max frags %d too big (%d max)\n",
2536                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_frags, 
2537                                IBNAL_MAX_RDMA_FRAGS);
2538                         kibnal_connreq_done(conn, 1, -EPROTO);
2539                         return;
2540                 }
2541                 
2542                 read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2543                 rc = (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2544                       msg->ibm_dststamp != kibnal_data.kib_incarnation) ?
2545                      -ESTALE : 0;
2546                 read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2547                 if (rc != 0) {
2548                         CERROR("Stale connection reply from "LPX64"\n",
2549                                peer->ibp_nid);
2550                         kibnal_connreq_done(conn, 1, rc);
2551                         return;
2552                 }
2553
2554                 conn->ibc_incarnation = msg->ibm_srcstamp;
2555                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2556                 
2557                 rc = kibnal_post_receives(conn);
2558                 if (rc != 0) {
2559                         CERROR("Can't post receives for "LPX64"\n",
2560                                peer->ibp_nid);
2561                         kibnal_connreq_done(conn, 1, rc);
2562                         return;
2563                 }
2564                 
2565                 rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2566                 if (rc != 0) {
2567                         kibnal_connreq_done(conn, 1, rc);
2568                         return;
2569                 }
2570                 
2571                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2572                 if (rc != 0) {
2573                         kibnal_connreq_done(conn, 1, rc);
2574                         return;
2575                 }
2576                 
2577                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_RTU);
2578                 kibnal_conn_addref(conn);       /* ++for CM callback */
2579                 
2580                 memset(&rtu, 0, sizeof(rtu));
2581                 cmrc = cm_accept(conn->ibc_cep, NULL, &rtu,
2582                                  kibnal_cm_callback, conn);
2583                 if (cmrc == cm_stat_success) {
2584                         /* Now I'm racing with disconnect signalled by
2585                          * kibnal_cm_callback */
2586                         kibnal_connreq_done(conn, 1, 0);
2587                         return;
2588                 }
2589
2590                 CERROR("cm_accept "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2591                 /* Back out of RTU: no callback coming */
2592                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2593                 kibnal_conn_decref(conn);
2594                 kibnal_connreq_done(conn, 1, -EIO);
2595                 return;
2596         }
2597
2598         if (cv->cv_conndata.status == cm_event_conn_reject) {
2599
2600                 if (cv->cv_conndata.data.reject.reason != cm_rej_code_stale_conn) {
2601                         CERROR("conn -> "LPX64" rejected: %d\n", peer->ibp_nid,
2602                                cv->cv_conndata.data.reject.reason);
2603                         kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2604                         return;
2605                 }
2606
2607                 CWARN ("conn -> "LPX64" stale: retrying\n", peer->ibp_nid);
2608
2609                 cep = cm_create_cep(cm_cep_transp_rc);
2610                 if (cep == NULL) {
2611                         CERROR("Can't create new CEP\n");
2612                         kibnal_connreq_done(conn, 1, -ENOMEM);
2613                         return;
2614                 }
2615
2616                 cmrc = cm_cancel(conn->ibc_cep);
2617                 LASSERT (cmrc == cm_stat_success);
2618                 cmrc = cm_destroy_cep(conn->ibc_cep);
2619                 LASSERT (cmrc == cm_stat_success);
2620
2621                 conn->ibc_cep = cep;
2622
2623                 /* retry connect */
2624                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2625                 kibnal_connect_conn(conn);
2626                 return;
2627         }
2628
2629         CERROR("conn -> "LPX64" failed: %d\n", peer->ibp_nid,
2630                cv->cv_conndata.status);
2631         kibnal_connreq_done(conn, 1, -ECONNABORTED);
2632 }
2633
2634 void
2635 kibnal_send_connreq (kib_conn_t *conn)
2636 {
2637         kib_peer_t           *peer = conn->ibc_peer;
2638         kib_connvars_t       *cv = conn->ibc_connvars;
2639         ibat_arp_data_t      *arp = &cv->cv_arp;
2640         ib_path_record_v2_t  *path = &cv->cv_path;
2641         vv_return_t           vvrc;
2642         int                   rc;
2643
2644         /* Only called by connd => statics OK */
2645         LASSERT (!in_interrupt());
2646         LASSERT (current == kibnal_data.kib_connd);
2647         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2648         
2649         if (cv->cv_arprc != ibat_stat_ok) {
2650                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: %d\n", peer->ibp_nid,
2651                        HIPQUAD(peer->ibp_ip), cv->cv_arprc);
2652                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2653                 return;
2654         }
2655
2656         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
2657                 CDEBUG(D_NET, "Got valid path for "LPX64"\n", peer->ibp_nid);
2658
2659                 *path = *arp->primary_path;
2660
2661                 vvrc = base_gid2port_num(kibnal_data.kib_hca, &path->sgid,
2662                                          &cv->cv_port);
2663                 LASSERT (vvrc == vv_return_ok);
2664
2665                 vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2666                                      &path->sgid, &cv->cv_sgid_index);
2667                 LASSERT (vvrc == vv_return_ok);
2668
2669                 vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2670                                        path->pkey, &cv->cv_pkey_index);
2671                 LASSERT (vvrc == vv_return_ok);
2672
2673                 path->mtu = IBNAL_IB_MTU;
2674
2675         } else if ((arp->mask & IBAT_LID_VALID) != 0) {
2676                 CWARN("Creating new path record for "LPX64"@%u.%u.%u.%u\n",
2677                       peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2678
2679                 cv->cv_pkey_index = IBNAL_PKEY_IDX;
2680                 cv->cv_sgid_index = IBNAL_SGID_IDX;
2681                 cv->cv_port = arp->local_port_num;
2682
2683                 memset(path, 0, sizeof(*path));
2684
2685                 vvrc = port_num2base_gid(kibnal_data.kib_hca, cv->cv_port,
2686                                          &path->sgid);
2687                 LASSERT (vvrc == vv_return_ok);
2688
2689                 vvrc = port_num2base_lid(kibnal_data.kib_hca, cv->cv_port,
2690                                          &path->slid);
2691                 LASSERT (vvrc == vv_return_ok);
2692
2693                 path->dgid          = arp->gid;
2694                 path->sl            = IBNAL_SERVICE_LEVEL;
2695                 path->dlid          = arp->lid;
2696                 path->mtu           = IBNAL_IB_MTU;
2697                 path->rate          = IBNAL_STATIC_RATE;
2698                 path->pkt_life_time = IBNAL_PKT_LIFETIME;
2699                 path->pkey          = IBNAL_PKEY;
2700                 path->traffic_class = IBNAL_TRAFFIC_CLASS;
2701         } else {
2702                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: no PATH or LID\n", 
2703                        peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2704                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2705                 return;
2706         }
2707
2708         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2709         if (rc != 0) {
2710                 kibnal_connreq_done(conn, 1, rc);
2711         }
2712
2713         /* do the actual connection request */
2714         kibnal_connect_conn(conn);
2715 }
2716
2717 void
2718 kibnal_arp_callback (ibat_stat_t arprc, ibat_arp_data_t *arp_data, void *arg)
2719 {
2720         /* CAVEAT EMPTOR: tasklet context */
2721         kib_conn_t      *conn = (kib_conn_t *)arg;
2722         kib_peer_t      *peer = conn->ibc_peer;
2723         unsigned long    flags;
2724
2725         CDEBUG(D_NET, "Arp "LPX64"@%u.%u.%u.%u rc %d LID %s PATH %s\n",
2726                peer->ibp_nid, HIPQUAD(peer->ibp_ip), arprc,
2727                (arp_data->mask & IBAT_LID_VALID) == 0 ? "invalid" : "valid",
2728                (arp_data->mask & IBAT_PRI_PATH_VALID) == 0 ? "invalid" : "valid");
2729         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2730
2731         conn->ibc_connvars->cv_arprc = arprc;
2732         conn->ibc_connvars->cv_arp = *arp_data;
2733         
2734         /* connd takes over my ref on conn */
2735         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2736         
2737         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2738         wake_up(&kibnal_data.kib_connd_waitq);
2739         
2740         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2741 }
2742
2743 void
2744 kibnal_arp_peer (kib_peer_t *peer)
2745 {
2746         cm_cep_handle_t  cep;
2747         kib_conn_t      *conn;
2748         int              ibatrc;
2749
2750         /* Only the connd does this (i.e. single threaded) */
2751         LASSERT (current == kibnal_data.kib_connd);
2752         LASSERT (peer->ibp_connecting != 0);
2753
2754         cep = cm_create_cep(cm_cep_transp_rc);
2755         if (cep == NULL) {
2756                 CERROR ("Can't create cep for conn->"LPX64"\n",
2757                         peer->ibp_nid);
2758                 kibnal_peer_connect_failed(peer, 1);
2759                 return;
2760         }
2761
2762         conn = kibnal_create_conn(cep);
2763         if (conn == NULL) {
2764                 CERROR ("Can't allocate conn->"LPX64"\n",
2765                         peer->ibp_nid);
2766                 cm_destroy_cep(cep);
2767                 kibnal_peer_connect_failed(peer, 1);
2768                 return;
2769         }
2770
2771         conn->ibc_peer = peer;
2772         kibnal_peer_addref(peer);
2773
2774         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2775
2776         ibatrc = ibat_get_ib_data(htonl(peer->ibp_ip), INADDR_ANY, 
2777                                   ibat_paths_primary,
2778                                   &conn->ibc_connvars->cv_arp, 
2779                                   kibnal_arp_callback, conn, 0);
2780         CDEBUG(D_NET,"ibatrc %d\n", ibatrc);
2781         switch (ibatrc) {
2782         default:
2783                 LBUG();
2784                 
2785         case ibat_stat_pending:
2786                 /* NB callback has my ref on conn */
2787                 break;
2788                 
2789         case ibat_stat_ok:
2790                 /* Immediate return (ARP cache hit) == no callback. */
2791                 kibnal_send_connreq(conn);
2792                 kibnal_conn_decref(conn);
2793                 break;
2794
2795         case ibat_stat_error:
2796         case ibat_stat_timeout:
2797         case ibat_stat_not_found:
2798                 CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n", peer->ibp_nid,
2799                        HIPQUAD(peer->ibp_ip), ibatrc);
2800                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2801                 kibnal_conn_decref(conn);
2802                 break;
2803         }
2804 }
2805
2806 int
2807 kibnal_conn_timed_out (kib_conn_t *conn)
2808 {
2809         kib_tx_t          *tx;
2810         struct list_head  *ttmp;
2811
2812         spin_lock(&conn->ibc_lock);
2813
2814         list_for_each (ttmp, &conn->ibc_tx_queue) {
2815                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2816
2817                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2818                         spin_unlock(&conn->ibc_lock);
2819                         return 1;
2820                 }
2821         }
2822
2823         list_for_each (ttmp, &conn->ibc_active_txs) {
2824                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2825
2826                 LASSERT (tx->tx_waiting ||
2827                          tx->tx_sending != 0);
2828
2829                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2830                         spin_unlock(&conn->ibc_lock);
2831                         return 1;
2832                 }
2833         }
2834
2835         spin_unlock(&conn->ibc_lock);
2836         return 0;
2837 }
2838
2839 void
2840 kibnal_check_conns (int idx)
2841 {
2842         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2843         struct list_head  *ptmp;
2844         kib_peer_t        *peer;
2845         kib_conn_t        *conn;
2846         struct list_head  *ctmp;
2847         unsigned long      flags;
2848
2849  again:
2850         /* NB. We expect to have a look at all the peers and not find any
2851          * rdmas to time out, so we just use a shared lock while we
2852          * take a look... */
2853         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2854
2855         list_for_each (ptmp, peers) {
2856                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2857
2858                 list_for_each (ctmp, &peer->ibp_conns) {
2859                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2860
2861                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2862
2863                         /* In case we have enough credits to return via a
2864                          * NOOP, but there were no non-blocking tx descs
2865                          * free to do it last time... */
2866                         kibnal_check_sends(conn);
2867
2868                         if (!kibnal_conn_timed_out(conn))
2869                                 continue;
2870
2871                         /* Handle timeout by closing the whole connection.  We
2872                          * can only be sure RDMA activity has ceased once the
2873                          * QP has been modified. */
2874                         
2875                         kibnal_conn_addref(conn); /* 1 ref for me... */
2876
2877                         read_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2878                                                flags);
2879
2880                         CERROR("Timed out RDMA with "LPX64"\n",
2881                                peer->ibp_nid);
2882
2883                         kibnal_close_conn (conn, -ETIMEDOUT);
2884                         kibnal_conn_decref(conn); /* ...until here */
2885
2886                         /* start again now I've dropped the lock */
2887                         goto again;
2888                 }
2889         }
2890
2891         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2892 }
2893
2894 void
2895 kibnal_disconnect_conn (kib_conn_t *conn)
2896 {
2897         static cm_drequest_data_t dreq;         /* just for the space */
2898         
2899         cm_return_t    cmrc;
2900         unsigned long  flags;
2901
2902         LASSERT (!in_interrupt());
2903         LASSERT (current == kibnal_data.kib_connd);
2904         
2905         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2906
2907         if (conn->ibc_disconnect) {
2908                 /* Had the CM callback already */
2909                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
2910                                         flags);
2911                 kibnal_conn_disconnected(conn);
2912                 return;
2913         }
2914                 
2915         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2916
2917         /* active disconnect */
2918         cmrc = cm_disconnect(conn->ibc_cep, &dreq, NULL);
2919         if (cmrc == cm_stat_success) {
2920                 /* waiting for CM */
2921                 conn->ibc_state = IBNAL_CONN_DISCONNECT2;
2922                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2923                 return;
2924         }
2925
2926         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2927
2928         cm_cancel(conn->ibc_cep);
2929         kibnal_pause(HZ/10);
2930
2931         if (!conn->ibc_disconnect)              /* CM callback will never happen now */
2932                 kibnal_conn_decref(conn);
2933         
2934         LASSERT (atomic_read(&conn->ibc_refcount) > 0);
2935         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2936
2937         kibnal_conn_disconnected(conn);
2938 }
2939
2940 int
2941 kibnal_connd (void *arg)
2942 {
2943         wait_queue_t       wait;
2944         unsigned long      flags;
2945         kib_pcreq_t       *pcr;
2946         kib_conn_t        *conn;
2947         kib_peer_t        *peer;
2948         int                timeout;
2949         int                i;
2950         int                dropped_lock;
2951         int                peer_index = 0;
2952         unsigned long      deadline = jiffies;
2953         
2954         kportal_daemonize ("kibnal_connd");
2955         kportal_blockallsigs ();
2956
2957         init_waitqueue_entry (&wait, current);
2958         kibnal_data.kib_connd = current;
2959
2960         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2961
2962         while (!kibnal_data.kib_shutdown) {
2963
2964                 dropped_lock = 0;
2965
2966                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
2967                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
2968                                            kib_conn_t, ibc_list);
2969                         list_del (&conn->ibc_list);
2970                         
2971                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2972                         dropped_lock = 1;
2973
2974                         kibnal_destroy_conn(conn);
2975
2976                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2977                 }
2978
2979                 if (!list_empty (&kibnal_data.kib_connd_pcreqs)) {
2980                         pcr = list_entry(kibnal_data.kib_connd_pcreqs.next,
2981                                          kib_pcreq_t, pcr_list);
2982                         list_del(&pcr->pcr_list);
2983                         
2984                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2985                         dropped_lock = 1;
2986
2987                         kibnal_recv_connreq(pcr->pcr_cep, &pcr->pcr_cmreq);
2988                         PORTAL_FREE(pcr, sizeof(*pcr));
2989
2990                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2991                 }
2992                         
2993                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2994                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2995                                            kib_peer_t, ibp_connd_list);
2996                         
2997                         list_del_init (&peer->ibp_connd_list);
2998                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2999                         dropped_lock = 1;
3000
3001                         kibnal_arp_peer (peer);
3002                         kibnal_peer_decref (peer);
3003
3004                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3005                 }
3006
3007                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3008                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3009                                            kib_conn_t, ibc_list);
3010                         list_del (&conn->ibc_list);
3011                         
3012                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3013                         dropped_lock = 1;
3014
3015                         switch (conn->ibc_state) {
3016                         default:
3017                                 LBUG();
3018                                 
3019                         case IBNAL_CONN_ACTIVE_ARP:
3020                                 kibnal_send_connreq(conn);
3021                                 break;
3022
3023                         case IBNAL_CONN_ACTIVE_CONNECT:
3024                                 kibnal_check_connreply(conn);
3025                                 break;
3026
3027                         case IBNAL_CONN_PASSIVE_WAIT:
3028                                 kibnal_check_passive_wait(conn);
3029                                 break;
3030
3031                         case IBNAL_CONN_DISCONNECT1:
3032                         case IBNAL_CONN_DISCONNECT2:
3033                                 kibnal_disconnect_conn(conn);
3034                                 break;
3035                         }
3036                         kibnal_conn_decref(conn);
3037
3038                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3039                 }
3040
3041                 /* careful with the jiffy wrap... */
3042                 timeout = (int)(deadline - jiffies);
3043                 if (timeout <= 0) {
3044                         const int n = 4;
3045                         const int p = 1;
3046                         int       chunk = kibnal_data.kib_peer_hash_size;
3047                         
3048                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3049                         dropped_lock = 1;
3050
3051                         /* Time to check for RDMA timeouts on a few more
3052                          * peers: I do checks every 'p' seconds on a
3053                          * proportion of the peer table and I need to check
3054                          * every connection 'n' times within a timeout
3055                          * interval, to ensure I detect a timeout on any
3056                          * connection within (n+1)/n times the timeout
3057                          * interval. */
3058
3059                         if (kibnal_tunables.kib_io_timeout > n * p)
3060                                 chunk = (chunk * n * p) / 
3061                                         kibnal_tunables.kib_io_timeout;
3062                         if (chunk == 0)
3063                                 chunk = 1;
3064
3065                         for (i = 0; i < chunk; i++) {
3066                                 kibnal_check_conns (peer_index);
3067                                 peer_index = (peer_index + 1) % 
3068                                              kibnal_data.kib_peer_hash_size;
3069                         }
3070
3071                         deadline += p * HZ;
3072                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3073                 }
3074
3075                 if (dropped_lock)
3076                         continue;
3077                 
3078                 /* Nothing to do for 'timeout'  */
3079                 set_current_state (TASK_INTERRUPTIBLE);
3080                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3081                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3082
3083                 schedule_timeout (timeout);
3084
3085                 set_current_state (TASK_RUNNING);
3086                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3087                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3088         }
3089
3090         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3091
3092         kibnal_thread_fini ();
3093         return (0);
3094 }
3095
3096 void 
3097 kibnal_async_callback(vv_event_record_t ev)
3098 {
3099         CERROR("type: %d, port: %d, data: "LPX64"\n", 
3100                ev.event_type, ev.port_num, ev.type.data);
3101 }
3102
3103 void
3104 kibnal_cq_callback (unsigned long unused_context)
3105 {
3106         unsigned long    flags;
3107
3108         CDEBUG(D_NET, "!!\n");
3109
3110         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3111         kibnal_data.kib_ready = 1;
3112         wake_up(&kibnal_data.kib_sched_waitq);
3113         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3114 }
3115
3116 int
3117 kibnal_scheduler(void *arg)
3118 {
3119         long            id = (long)arg;
3120         wait_queue_t    wait;
3121         char            name[16];
3122         vv_wc_t         wc;
3123         vv_return_t     vvrc;
3124         vv_return_t     vvrc2;
3125         unsigned long   flags;
3126         int             busy_loops = 0;
3127
3128         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3129         kportal_daemonize(name);
3130         kportal_blockallsigs();
3131
3132         init_waitqueue_entry(&wait, current);
3133
3134         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3135
3136         while (!kibnal_data.kib_shutdown) {
3137                 if (busy_loops++ >= IBNAL_RESCHED) {
3138                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3139                                                flags);
3140
3141                         our_cond_resched();
3142                         busy_loops = 0;
3143                         
3144                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3145                 }
3146
3147                 if (kibnal_data.kib_ready &&
3148                     !kibnal_data.kib_checking_cq) {
3149                         /* take ownership of completion polling */
3150                         kibnal_data.kib_checking_cq = 1;
3151                         /* Assume I'll exhaust the CQ */
3152                         kibnal_data.kib_ready = 0;
3153                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, 
3154                                                flags);
3155                         
3156                         vvrc = vv_poll_for_completion(kibnal_data.kib_hca, 
3157                                                       kibnal_data.kib_cq, &wc);
3158                         if (vvrc == vv_return_err_cq_empty) {
3159                                 vvrc2 = vv_request_completion_notification(
3160                                         kibnal_data.kib_hca, 
3161                                         kibnal_data.kib_cq, 
3162                                         vv_next_solicit_unsolicit_event);
3163                                 LASSERT (vvrc2 == vv_return_ok);
3164                         }
3165                         
3166                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3167                         /* give up ownership of completion polling */
3168                         kibnal_data.kib_checking_cq = 0;
3169
3170                         if (vvrc == vv_return_err_cq_empty)
3171                                 continue;
3172
3173                         LASSERT (vvrc == vv_return_ok);
3174                         /* Assume there's more: get another scheduler to check
3175                          * while I handle this completion... */
3176
3177                         kibnal_data.kib_ready = 1;
3178                         wake_up(&kibnal_data.kib_sched_waitq);
3179
3180                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3181                                                flags);
3182
3183                         switch (wc.operation_type) {
3184                         case vv_wc_send_rq:
3185                                 kibnal_rx_complete((kib_rx_t *)((unsigned long)wc.wr_id),
3186                                                    wc.completion_status,
3187                                                    wc.num_bytes_transfered);
3188                                 break;
3189                         case vv_wc_send_sq:
3190                                 kibnal_tx_complete((kib_tx_t *)((unsigned long)wc.wr_id),
3191                                                    1, wc.completion_status);
3192                                 break;
3193                         case vv_wc_rdma_write_sq:
3194                                 kibnal_tx_complete((kib_tx_t *)((unsigned long)wc.wr_id),
3195                                                    0, wc.completion_status);
3196                                 break;
3197                         default:
3198                                 LBUG();
3199                         }
3200                         
3201                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3202                         continue;
3203                 }
3204
3205                 /* Nothing to do; sleep... */
3206
3207                 set_current_state(TASK_INTERRUPTIBLE);
3208                 add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3209                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3210                                        flags);
3211
3212                 schedule();
3213
3214                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3215                 set_current_state(TASK_RUNNING);
3216                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3217         }
3218
3219         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3220
3221         kibnal_thread_fini();
3222         return (0);
3223 }
3224
3225
3226 lib_nal_t kibnal_lib = {
3227         .libnal_data = &kibnal_data,      /* NAL private data */
3228         .libnal_send = kibnal_send,
3229         .libnal_send_pages = kibnal_send_pages,
3230         .libnal_recv = kibnal_recv,
3231         .libnal_recv_pages = kibnal_recv_pages,
3232         .libnal_dist = kibnal_dist
3233 };