Whamcloud - gitweb
* vibnal fixes
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 void
28 kibnal_tx_done (kib_tx_t *tx)
29 {
30         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
31         int              i;
32
33         LASSERT (!in_interrupt());
34         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
35         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
36
37 #if !IBNAL_WHOLE_MEM
38         switch (tx->tx_mapped) {
39         default:
40                 LBUG();
41
42         case KIB_TX_UNMAPPED:
43                 break;
44
45         case KIB_TX_MAPPED: {
46                 vv_return_t      vvrc;
47
48                 vvrc = vv_mem_region_destroy(kibnal_data.kib_hca,
49                                              tx->tx_md.md_handle);
50                 LASSERT (vvrc == vv_return_ok);
51                 tx->tx_mapped = KIB_TX_UNMAPPED;
52                 break;
53         }
54         }
55 #endif
56         for (i = 0; i < 2; i++) {
57                 /* tx may have up to 2 libmsgs to finalise */
58                 if (tx->tx_libmsg[i] == NULL)
59                         continue;
60
61                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
62                 tx->tx_libmsg[i] = NULL;
63         }
64         
65         if (tx->tx_conn != NULL) {
66                 kibnal_conn_decref(tx->tx_conn);
67                 tx->tx_conn = NULL;
68         }
69
70         tx->tx_nwrq = 0;
71         tx->tx_status = 0;
72
73         spin_lock(&kibnal_data.kib_tx_lock);
74
75         if (tx->tx_isnblk) {
76                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
77         } else {
78                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
79                 wake_up (&kibnal_data.kib_idle_tx_waitq);
80         }
81
82         spin_unlock(&kibnal_data.kib_tx_lock);
83 }
84
85 kib_tx_t *
86 kibnal_get_idle_tx (int may_block) 
87 {
88         kib_tx_t      *tx = NULL;
89         ENTRY;
90         
91         for (;;) {
92                 spin_lock(&kibnal_data.kib_tx_lock);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
96                         tx = list_entry (kibnal_data.kib_idle_txs.next,
97                                          kib_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
104                                 CERROR ("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
109                                          kib_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock(&kibnal_data.kib_tx_lock);
115
116                 wait_event (kibnal_data.kib_idle_tx_waitq,
117                             !list_empty (&kibnal_data.kib_idle_txs) ||
118                             kibnal_data.kib_shutdown);
119         }
120
121         if (tx != NULL) {
122                 list_del (&tx->tx_list);
123
124                 /* Allocate a new completion cookie.  It might not be needed,
125                  * but we've got a lock right now and we're unlikely to
126                  * wrap... */
127                 tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
128 #if IBNAL_WHOLE_MEM
129                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
130 #endif
131                 LASSERT (tx->tx_nwrq == 0);
132                 LASSERT (tx->tx_sending == 0);
133                 LASSERT (!tx->tx_waiting);
134                 LASSERT (tx->tx_status == 0);
135                 LASSERT (tx->tx_conn == NULL);
136                 LASSERT (tx->tx_libmsg[0] == NULL);
137                 LASSERT (tx->tx_libmsg[1] == NULL);
138         }
139
140         spin_unlock(&kibnal_data.kib_tx_lock);
141         
142         RETURN(tx);
143 }
144
145 int
146 kibnal_post_rx (kib_rx_t *rx, int credit)
147 {
148         kib_conn_t   *conn = rx->rx_conn;
149         int           rc = 0;
150         vv_return_t   vvrc;
151
152         LASSERT (!in_interrupt());
153         
154         rx->rx_gl = (vv_scatgat_t) {
155                 .v_address = (void *)((unsigned long)KIBNAL_RX_VADDR(rx)),
156                 .l_key     = KIBNAL_RX_LKEY(rx),
157                 .length    = IBNAL_MSG_SIZE,
158         };
159
160         rx->rx_wrq = (vv_wr_t) {
161                 .wr_id                   = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
162                 .completion_notification = 1,
163                 .scatgat_list            = &rx->rx_gl,
164                 .num_of_data_segments    = 1,
165                 .wr_type                 = vv_wr_receive,
166         };
167
168         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
169         LASSERT (!rx->rx_posted);
170
171         CDEBUG(D_NET, "posting rx [%d %x %p]\n", 
172                rx->rx_wrq.scatgat_list->length,
173                rx->rx_wrq.scatgat_list->l_key,
174                rx->rx_wrq.scatgat_list->v_address);
175
176         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
177                 /* No more posts for this rx; so lose its ref */
178                 kibnal_conn_decref(conn);
179                 return 0;
180         }
181         
182         rx->rx_posted = 1;
183
184         spin_lock(&conn->ibc_lock);
185         /* Serialise vv_post_receive; it's not re-entrant on the same QP */
186         vvrc = vv_post_receive(kibnal_data.kib_hca,
187                                conn->ibc_qp, &rx->rx_wrq);
188         spin_unlock(&conn->ibc_lock);
189
190         if (vvrc == 0) {
191                 if (credit) {
192                         spin_lock(&conn->ibc_lock);
193                         conn->ibc_outstanding_credits++;
194                         spin_unlock(&conn->ibc_lock);
195
196                         kibnal_check_sends(conn);
197                 }
198                 return 0;
199         }
200         
201         CERROR ("post rx -> "LPX64" failed %d\n", 
202                 conn->ibc_peer->ibp_nid, vvrc);
203         rc = -EIO;
204         kibnal_close_conn(rx->rx_conn, rc);
205         /* No more posts for this rx; so lose its ref */
206         kibnal_conn_decref(conn);
207         return rc;
208 }
209
210 int
211 kibnal_post_receives (kib_conn_t *conn)
212 {
213         int    i;
214         int    rc;
215
216         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
217         LASSERT (conn->ibc_comms_error == 0);
218
219         for (i = 0; i < IBNAL_RX_MSGS; i++) {
220                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
221                  * fails (i.e. actual failure or we're disconnecting) */
222                 kibnal_conn_addref(conn);
223                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0);
224                 if (rc != 0)
225                         return rc;
226         }
227
228         return 0;
229 }
230
231 kib_tx_t *
232 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
233 {
234         struct list_head   *tmp;
235         
236         list_for_each(tmp, &conn->ibc_active_txs) {
237                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
238                 
239                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
240
241                 if (tx->tx_cookie != cookie)
242                         continue;
243
244                 if (tx->tx_waiting &&
245                     tx->tx_msg->ibm_type == txtype)
246                         return tx;
247
248                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
249                       tx->tx_waiting ? "" : "NOT ",
250                       tx->tx_msg->ibm_type, txtype);
251         }
252         return NULL;
253 }
254
255 void
256 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
257 {
258         kib_tx_t    *tx;
259         int          idle;
260
261         spin_lock(&conn->ibc_lock);
262
263         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
264         if (tx == NULL) {
265                 spin_unlock(&conn->ibc_lock);
266
267                 CWARN("Unmatched completion type %x cookie "LPX64
268                       " from "LPX64"\n",
269                       txtype, cookie, conn->ibc_peer->ibp_nid);
270                 kibnal_close_conn (conn, -EPROTO);
271                 return;
272         }
273
274         if (tx->tx_status == 0) {               /* success so far */
275                 if (status < 0) {               /* failed? */
276                         tx->tx_status = status;
277                 } else if (txtype == IBNAL_MSG_GET_REQ) { 
278                         /* XXX layering violation: set REPLY data length */
279                         LASSERT (tx->tx_libmsg[1] != NULL);
280                         LASSERT (tx->tx_libmsg[1]->ev.type == 
281                                  PTL_EVENT_REPLY_END);
282
283                         tx->tx_libmsg[1]->ev.mlength = status;
284                 }
285         }
286         
287         tx->tx_waiting = 0;
288
289         idle = tx->tx_sending == 0;
290         if (idle)
291                 list_del(&tx->tx_list);
292
293         spin_unlock(&conn->ibc_lock);
294         
295         if (idle)
296                 kibnal_tx_done(tx);
297 }
298
299 void
300 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
301 {
302         kib_tx_t    *tx = kibnal_get_idle_tx(0);
303         
304         if (tx == NULL) {
305                 CERROR("Can't get tx for completion %x for "LPX64"\n",
306                        type, conn->ibc_peer->ibp_nid);
307                 return;
308         }
309         
310         tx->tx_msg->ibm_u.completion.ibcm_status = status;
311         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
312         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
313         
314         kibnal_queue_tx(tx, conn);
315 }
316
317 void
318 kibnal_handle_rx (kib_rx_t *rx)
319 {
320         kib_msg_t    *msg = rx->rx_msg;
321         kib_conn_t   *conn = rx->rx_conn;
322         int           credits = msg->ibm_credits;
323         kib_tx_t     *tx;
324         int           rc;
325
326         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
327
328         CDEBUG (D_NET, "Received %x[%d] from "LPX64"\n",
329                 msg->ibm_type, credits, conn->ibc_peer->ibp_nid);
330         
331         if (credits != 0) {
332                 /* Have I received credits that will let me send? */
333                 spin_lock(&conn->ibc_lock);
334                 conn->ibc_credits += credits;
335                 spin_unlock(&conn->ibc_lock);
336
337                 kibnal_check_sends(conn);
338         }
339
340         switch (msg->ibm_type) {
341         default:
342                 CERROR("Bad IBNAL message type %x from "LPX64"\n",
343                        msg->ibm_type, conn->ibc_peer->ibp_nid);
344                 break;
345
346         case IBNAL_MSG_NOOP:
347                 break;
348
349         case IBNAL_MSG_IMMEDIATE:
350                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
351                 break;
352                 
353         case IBNAL_MSG_PUT_REQ:
354                 rx->rx_responded = 0;
355                 lib_parse(&kibnal_lib, &msg->ibm_u.putreq.ibprm_hdr, rx);
356                 if (rx->rx_responded)
357                         break;
358
359                 /* I wasn't asked to transfer any payload data.  This happens
360                  * if the PUT didn't match, or got truncated. */
361                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
362                                        msg->ibm_u.putreq.ibprm_cookie);
363                 break;
364
365         case IBNAL_MSG_PUT_NAK:
366                 CWARN ("PUT_NACK from "LPX64"\n", conn->ibc_peer->ibp_nid);
367                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
368                                          msg->ibm_u.completion.ibcm_status,
369                                          msg->ibm_u.completion.ibcm_cookie);
370                 break;
371
372         case IBNAL_MSG_PUT_ACK:
373                 spin_lock(&conn->ibc_lock);
374                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
375                                                    msg->ibm_u.putack.ibpam_src_cookie);
376                 if (tx != NULL)
377                         list_del(&tx->tx_list);
378                 spin_unlock(&conn->ibc_lock);
379
380                 if (tx == NULL) {
381                         CERROR("Unmatched PUT_ACK from "LPX64"\n",
382                                conn->ibc_peer->ibp_nid);
383                         kibnal_close_conn(conn, -EPROTO);
384                         break;
385                 }
386
387                 LASSERT (tx->tx_waiting);
388                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
389                  * (a) I can overwrite tx_msg since my peer has received it!
390                  * (b) while tx_waiting is set, tx_complete() won't touch it.
391                  */
392
393                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
394
395                 rc = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
396                                       kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
397                                       &msg->ibm_u.putack.ibpam_rd,
398                                       msg->ibm_u.putack.ibpam_dst_cookie);
399                 if (rc < 0)
400                         CERROR("Can't setup rdma for PUT to "LPX64": %d\n",
401                                conn->ibc_peer->ibp_nid, rc);
402
403                 spin_lock(&conn->ibc_lock);
404                 if (tx->tx_status == 0 && rc < 0)
405                         tx->tx_status = rc;
406                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
407                 kibnal_queue_tx_locked(tx, conn);
408                 spin_unlock(&conn->ibc_lock);
409                 break;
410                 
411         case IBNAL_MSG_PUT_DONE:
412                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
413                                          msg->ibm_u.completion.ibcm_status,
414                                          msg->ibm_u.completion.ibcm_cookie);
415                 break;
416
417         case IBNAL_MSG_GET_REQ:
418                 rx->rx_responded = 0;
419                 lib_parse(&kibnal_lib, &msg->ibm_u.get.ibgm_hdr, rx);
420                 if (rx->rx_responded)           /* I responded to the GET_REQ */
421                         break;
422                 /* NB GET didn't match (I'd have responded even with no payload
423                  * data) */
424                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, -ENODATA,
425                                        msg->ibm_u.get.ibgm_cookie);
426                 break;
427
428         case IBNAL_MSG_GET_DONE:
429                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
430                                          msg->ibm_u.completion.ibcm_status,
431                                          msg->ibm_u.completion.ibcm_cookie);
432                 break;
433         }
434
435         kibnal_post_rx(rx, 1);
436 }
437
438 void
439 kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob)
440 {
441         kib_msg_t    *msg = rx->rx_msg;
442         kib_conn_t   *conn = rx->rx_conn;
443         unsigned long flags;
444         int           rc;
445
446         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
447         LASSERT (rx->rx_posted);
448         rx->rx_posted = 0;
449
450         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
451                 goto ignore;
452
453         if (vvrc != vv_comp_status_success) {
454                 CERROR("Rx from "LPX64" failed: %d\n", 
455                        conn->ibc_peer->ibp_nid, vvrc);
456                 goto failed;
457         }
458
459         rc = kibnal_unpack_msg(msg, nob);
460         if (rc != 0) {
461                 CERROR ("Error %d unpacking rx from "LPX64"\n",
462                         rc, conn->ibc_peer->ibp_nid);
463                 goto failed;
464         }
465
466         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
467             msg->ibm_srcstamp != conn->ibc_incarnation ||
468             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
469             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
470                 CERROR ("Stale rx from "LPX64"\n",
471                         conn->ibc_peer->ibp_nid);
472                 goto failed;
473         }
474
475         /* racing with connection establishment/teardown! */
476
477         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
478                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
479                 /* must check holding global lock to eliminate race */
480                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
481                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
482                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
483                                                 flags);
484                         return;
485                 }
486                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
487                                         flags);
488         }
489         kibnal_handle_rx(rx);
490         return;
491         
492  failed:
493         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
494         kibnal_close_conn(conn, -EIO);
495  ignore:
496         /* Don't re-post rx & drop its ref on conn */
497         kibnal_conn_decref(conn);
498 }
499
500 #if IBNAL_WHOLE_MEM
501 int
502 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
503                      unsigned long page_offset, unsigned long len)
504 {
505         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
506         vv_l_key_t       l_key;
507         vv_r_key_t       r_key;
508         __u64            addr;
509         __u64            frag_addr;
510         void            *ptr;
511         vv_mem_reg_h_t   mem_h;
512         vv_return_t      vvrc;
513
514         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
515                 CERROR ("Too many RDMA fragments\n");
516                 return -EMSGSIZE;
517         }
518
519 #if CONFIG_HIGHMEM
520 # error "This probably doesn't work because of over/underflow when casting between __u64 and void *..."
521 #endif
522         /* Try to create an address that adapter-tavor will munge into a valid
523          * network address, given how it maps all phys mem into 1 region */
524         addr = page_to_phys(page) + page_offset + PAGE_OFFSET;
525
526         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca, 
527                                     (void *)((unsigned long)addr),
528                                     len, &mem_h, &l_key, &r_key);
529         LASSERT (vvrc == vv_return_ok);
530
531         if (active) {
532                 if (rd->rd_nfrag == 0) {
533                         rd->rd_key = l_key;
534                 } else if (l_key != rd->rd_key) {
535                         CERROR ("> 1 key for single RDMA desc\n");
536                         return -EINVAL;
537                 }
538                 frag_addr = addr;
539         } else {
540                 if (rd->rd_nfrag == 0) {
541                         rd->rd_key = r_key;
542                 } else if (r_key != rd->rd_key) {
543                         CERROR ("> 1 key for single RDMA desc\n");
544                         return -EINVAL;
545                 }
546                 vv_va2advertise_addr(kibnal_data.kib_hca, 
547                                      (void *)((unsigned long)addr), &ptr);
548                 frag_addr = (unsigned long)ptr;
549         }
550
551         kibnal_rf_set(frag, frag_addr, len);
552
553         CDEBUG(D_NET,"map frag [%d][%d %x %08x%08x] "LPX64"\n", 
554                rd->rd_nfrag, frag->rf_nob, rd->rd_key, 
555                frag->rf_addr_hi, frag->rf_addr_lo, frag_addr);
556
557         rd->rd_nfrag++;
558         return 0;
559 }
560
561 struct page *
562 kibnal_kvaddr_to_page (unsigned long vaddr)
563 {
564         struct page *page;
565
566         if (vaddr >= VMALLOC_START &&
567             vaddr < VMALLOC_END)
568                 page = vmalloc_to_page ((void *)vaddr);
569 #if CONFIG_HIGHMEM
570         else if (vaddr >= PKMAP_BASE &&
571                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
572                 page = vmalloc_to_page ((void *)vaddr);
573         /* in 2.4 ^ just walks the page tables */
574 #endif
575         else
576                 page = virt_to_page (vaddr);
577
578         return VALID_PAGE(page) ? page : NULL;
579 }
580
581 int
582 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, 
583                     vv_access_con_bit_mask_t access,
584                     int niov, struct iovec *iov, int offset, int nob)
585                  
586 {
587         /* active if I'm sending */
588         int           active = ((access & vv_acc_r_mem_write) == 0);
589         int           fragnob;
590         int           rc;
591         unsigned long vaddr;
592         struct page  *page;
593         int           page_offset;
594
595         LASSERT (nob > 0);
596         LASSERT (niov > 0);
597         LASSERT ((rd != tx->tx_rd) == !active);
598
599         while (offset >= iov->iov_len) {
600                 offset -= iov->iov_len;
601                 niov--;
602                 iov++;
603                 LASSERT (niov > 0);
604         }
605
606         rd->rd_nfrag = 0;
607         do {
608                 LASSERT (niov > 0);
609
610                 vaddr = ((unsigned long)iov->iov_base) + offset;
611                 page_offset = vaddr & (PAGE_SIZE - 1);
612                 page = kibnal_kvaddr_to_page(vaddr);
613                 if (page == NULL) {
614                         CERROR ("Can't find page\n");
615                         return -EFAULT;
616                 }
617
618                 fragnob = min((int)(iov->iov_len - offset), nob);
619                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
620
621                 rc = kibnal_append_rdfrag(rd, active, page, 
622                                           page_offset, fragnob);
623                 if (rc != 0)
624                         return rc;
625
626                 if (offset + fragnob < iov->iov_len) {
627                         offset += fragnob;
628                 } else {
629                         offset = 0;
630                         iov++;
631                         niov--;
632                 }
633                 nob -= fragnob;
634         } while (nob > 0);
635         
636         return 0;
637 }
638
639 int
640 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, 
641                       vv_access_con_bit_mask_t access,
642                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
643 {
644         /* active if I'm sending */
645         int            active = ((access & vv_acc_r_mem_write) == 0);
646         int            fragnob;
647         int            rc;
648
649         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
650
651         LASSERT (nob > 0);
652         LASSERT (nkiov > 0);
653         LASSERT ((rd != tx->tx_rd) == !active);
654
655         while (offset >= kiov->kiov_len) {
656                 offset -= kiov->kiov_len;
657                 nkiov--;
658                 kiov++;
659                 LASSERT (nkiov > 0);
660         }
661
662         rd->rd_nfrag = 0;
663         do {
664                 LASSERT (nkiov > 0);
665                 fragnob = min((int)(kiov->kiov_len - offset), nob);
666                 
667                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
668                                           kiov->kiov_offset + offset,
669                                           fragnob);
670                 if (rc != 0)
671                         return rc;
672
673                 offset = 0;
674                 kiov++;
675                 nkiov--;
676                 nob -= fragnob;
677         } while (nob > 0);
678
679         return 0;
680 }
681 #else
682 int
683 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
684                      vv_access_con_bit_mask_t access,
685                      int niov, struct iovec *iov, int offset, int nob)
686                  
687 {
688         /* active if I'm sending */
689         int         active = ((access & vv_acc_r_mem_write) == 0);
690         void       *vaddr;
691         vv_return_t vvrc;
692
693         LASSERT (nob > 0);
694         LASSERT (niov > 0);
695         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
696         LASSERT ((rd != tx->tx_rd) == !active);
697
698         while (offset >= iov->iov_len) {
699                 offset -= iov->iov_len;
700                 niov--;
701                 iov++;
702                 LASSERT (niov > 0);
703         }
704
705         if (nob > iov->iov_len - offset) {
706                 CERROR ("Can't map multiple vaddr fragments\n");
707                 return (-EMSGSIZE);
708         }
709
710         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
711         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
712
713         vvrc = vv_mem_region_register(kibnal_data.kib_hca, vaddr, nob,
714                                       kibnal_data.kib_pd, access,
715                                       &tx->tx_md.md_handle, 
716                                       &tx->tx_md.md_lkey,
717                                       &tx->tx_md.md_rkey);
718         if (vvrc != vv_return_ok) {
719                 CERROR ("Can't map vaddr %p: %d\n", vaddr, vvrc);
720                 return -EFAULT;
721         }
722
723         tx->tx_mapped = KIB_TX_MAPPED;
724
725         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
726         rd->rd_nfrag = 1;
727         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
728         
729         return (0);
730 }
731
732 int
733 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
734                       vv_access_con_bit_mask_t access,
735                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
736 {
737         /* active if I'm sending */
738         int            active = ((access & vv_acc_r_mem_write) == 0);
739         vv_return_t    vvrc;
740         vv_phy_list_t  phys_pages;
741         vv_phy_buf_t  *phys;
742         int            page_offset;
743         int            nphys;
744         int            resid;
745         int            phys_size;
746         int            rc;
747
748         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
749
750         LASSERT (nob > 0);
751         LASSERT (nkiov > 0);
752         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
753         LASSERT ((rd != tx->tx_rd) == !active);
754
755         while (offset >= kiov->kiov_len) {
756                 offset -= kiov->kiov_len;
757                 nkiov--;
758                 kiov++;
759                 LASSERT (nkiov > 0);
760         }
761
762         phys_size = nkiov * sizeof (*phys);
763         PORTAL_ALLOC(phys, phys_size);
764         if (phys == NULL) {
765                 CERROR ("Can't allocate tmp phys\n");
766                 return (-ENOMEM);
767         }
768
769         page_offset = kiov->kiov_offset + offset;
770
771         phys[0].start = kibnal_page2phys(kiov->kiov_page);
772         phys[0].size = PAGE_SIZE;
773
774         nphys = 1;
775         resid = nob - (kiov->kiov_len - offset);
776
777         while (resid > 0) {
778                 kiov++;
779                 nkiov--;
780                 LASSERT (nkiov > 0);
781
782                 if (kiov->kiov_offset != 0 ||
783                     ((resid > PAGE_SIZE) && 
784                      kiov->kiov_len < PAGE_SIZE)) {
785                         int i;
786                         /* Can't have gaps */
787                         CERROR ("Can't make payload contiguous in I/O VM:"
788                                 "page %d, offset %d, len %d \n", nphys, 
789                                 kiov->kiov_offset, kiov->kiov_len);
790
791                         for (i = -nphys; i < nkiov; i++)
792                                 CERROR("kiov[%d] %p +%d for %d\n",
793                                        i, kiov[i].kiov_page, 
794                                        kiov[i].kiov_offset, 
795                                        kiov[i].kiov_len);
796                         
797                         rc = -EINVAL;
798                         goto out;
799                 }
800
801                 LASSERT (nphys * sizeof (*phys) < phys_size);
802                 phys[nphys].start = kibnal_page2phys(kiov->kiov_page);
803                 phys[nphys].size = PAGE_SIZE;
804
805                 nphys++;
806                 resid -= PAGE_SIZE;
807         }
808
809 #if 0
810         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
811         for (i = 0; i < nphys; i++)
812                 CWARN ("   [%d] "LPX64"\n", i, phys[i]);
813 #endif
814
815         vvrc = vv_phy_mem_region_register(kibnal_data.kib_hca,
816                                           &phys_pages,
817                                           IBNAL_RDMA_BASE,
818                                           nphys,
819                                           page_offset,
820                                           kibnal_data.kib_pd,
821                                           access,
822                                           &tx->tx_md.md_handle,
823                                           &tx->tx_md.md_addr,
824                                           &tx->tx_md.md_lkey,
825                                           &tx->tx_md.md_rkey);
826
827         if (vvrc != vv_return_ok) {
828                 CERROR ("Can't map phys: %d\n", vvrc);
829                 rc = -EFAULT;
830                 goto out;
831         }
832
833         CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: "
834                "lkey %x, rkey %x, addr "LPX64"\n",
835                nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey,
836                tx->tx_md.md_addr);
837
838         tx->tx_mapped = KIB_TX_MAPPED;
839         rc = 0;
840
841         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
842         rd->rd_nfrag = 1;
843         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
844         
845  out:
846         PORTAL_FREE(phys, phys_size);
847         return (rc);
848 }
849 #endif
850
851 kib_conn_t *
852 kibnal_find_conn_locked (kib_peer_t *peer)
853 {
854         struct list_head *tmp;
855
856         /* just return the first connection */
857         list_for_each (tmp, &peer->ibp_conns) {
858                 return (list_entry(tmp, kib_conn_t, ibc_list));
859         }
860
861         return (NULL);
862 }
863
864 void
865 kibnal_check_sends (kib_conn_t *conn)
866 {
867         kib_tx_t       *tx;
868         vv_return_t     vvrc;                        
869         int             rc;
870         int             i;
871         int             done;
872
873         /* Don't send anything until after the connection is established */
874         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
875                 CDEBUG(D_NET, LPX64"too soon\n", conn->ibc_peer->ibp_nid);
876                 return;
877         }
878         
879         spin_lock(&conn->ibc_lock);
880
881         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
882
883         if (list_empty(&conn->ibc_tx_queue) &&
884             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
885                 spin_unlock(&conn->ibc_lock);
886                 
887                 tx = kibnal_get_idle_tx(0);     /* don't block */
888                 if (tx != NULL)
889                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
890
891                 spin_lock(&conn->ibc_lock);
892                 
893                 if (tx != NULL)
894                         kibnal_queue_tx_locked(tx, conn);
895         }
896
897         while (!list_empty (&conn->ibc_tx_queue)) {
898                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
899
900                 /* We rely on this for QP sizing */
901                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
902
903                 LASSERT (conn->ibc_outstanding_credits >= 0);
904                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
905                 LASSERT (conn->ibc_credits >= 0);
906                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
907
908                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE) {
909                         CDEBUG(D_NET, LPX64": posted enough\n",
910                                conn->ibc_peer->ibp_nid);
911                         break;
912                 }
913                 
914                 if (conn->ibc_credits == 0) {   /* no credits */
915                         CDEBUG(D_NET, LPX64": no credits\n",
916                                conn->ibc_peer->ibp_nid);
917                         break;
918                 }
919                 
920                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
921                     conn->ibc_outstanding_credits == 0) { /* giving back credits */
922                         CDEBUG(D_NET, LPX64": not using last credit\n",
923                                conn->ibc_peer->ibp_nid);
924                         break;
925                 }
926                 
927                 list_del (&tx->tx_list);
928
929                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
930                     (!list_empty(&conn->ibc_tx_queue) ||
931                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
932                         /* redundant NOOP */
933                         spin_unlock(&conn->ibc_lock);
934                         kibnal_tx_done(tx);
935                         spin_lock(&conn->ibc_lock);
936                         CDEBUG(D_NET, LPX64": redundant noop\n",
937                                conn->ibc_peer->ibp_nid);
938                         continue;
939                 }
940
941                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
942                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
943
944                 conn->ibc_outstanding_credits = 0;
945                 conn->ibc_nsends_posted++;
946                 conn->ibc_credits--;
947
948                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
949                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
950                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
951                  * and then re-queued here.  It's (just) possible that
952                  * tx_sending is non-zero if we've not done the tx_complete() from
953                  * the first send; hence the ++ rather than = below. */
954                 tx->tx_sending++;
955
956                 list_add (&tx->tx_list, &conn->ibc_active_txs);
957
958                 /* Keep holding ibc_lock while posting sends on this
959                  * connection; vv_post_send() isn't re-entrant on the same
960                  * QP!! */
961
962                 LASSERT (tx->tx_nwrq > 0);
963
964                 rc = -ECONNABORTED;
965                 vvrc = vv_return_ok;
966                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
967                         tx->tx_status = 0;
968                         vvrc = vv_post_send_list(kibnal_data.kib_hca,
969                                                  conn->ibc_qp,
970                                                  tx->tx_nwrq,
971                                                  tx->tx_wrq,
972                                                  vv_operation_type_send_rc);
973                         rc = (vvrc == vv_return_ok) ? 0 : -EIO;
974                 }
975
976                 if (rc != 0) {
977                         /* NB credits are transferred in the actual
978                          * message, which can only be the last work item */
979                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
980                         conn->ibc_credits++;
981                         conn->ibc_nsends_posted--;
982
983                         tx->tx_status = rc;
984                         tx->tx_waiting = 0;
985                         tx->tx_sending--;
986                         
987                         done = (tx->tx_sending == 0);
988                         if (done)
989                                 list_del (&tx->tx_list);
990                         
991                         spin_unlock(&conn->ibc_lock);
992                         
993                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
994                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
995                                         vvrc, conn->ibc_peer->ibp_nid);
996                         else
997                                 CDEBUG (D_NET, "Error %d posting transmit to "
998                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
999
1000                         kibnal_close_conn (conn, rc);
1001
1002                         if (done)
1003                                 kibnal_tx_done (tx);
1004                         return;
1005                 }
1006         }
1007
1008         spin_unlock(&conn->ibc_lock);
1009 }
1010
1011 void
1012 kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
1013 {
1014         kib_conn_t   *conn = tx->tx_conn;
1015         int           failed = (vvrc != vv_comp_status_success);
1016         int           idle;
1017
1018         CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n", 
1019                tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);
1020
1021         LASSERT (tx->tx_sending != 0);
1022
1023         if (failed &&
1024             tx->tx_status == 0 &&
1025             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1026                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
1027                         conn->ibc_peer->ibp_nid, vvrc);
1028
1029         spin_lock(&conn->ibc_lock);
1030
1031         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1032          * gets to free it, which also drops its ref on 'conn'. */
1033
1034         tx->tx_sending--;
1035         conn->ibc_nsends_posted--;
1036
1037         if (failed) {
1038                 tx->tx_waiting = 0;
1039                 tx->tx_status = -EIO;
1040         }
1041         
1042         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1043                !tx->tx_waiting;                 /* Not waiting for peer */
1044         if (idle)
1045                 list_del(&tx->tx_list);
1046
1047         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1048
1049         spin_unlock(&conn->ibc_lock);
1050
1051         if (idle)
1052                 kibnal_tx_done (tx);
1053
1054         if (failed)
1055                 kibnal_close_conn (conn, -EIO);
1056         else
1057                 kibnal_check_sends(conn);
1058
1059         kibnal_conn_decref(conn);               /* ...until here */
1060 }
1061
1062 void
1063 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1064 {
1065         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];
1066         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];
1067         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1068
1069         LASSERT (tx->tx_nwrq >= 0 && 
1070                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1071         LASSERT (nob <= IBNAL_MSG_SIZE);
1072
1073         kibnal_init_msg(tx->tx_msg, type, body_nob);
1074
1075         *gl = (vv_scatgat_t) {
1076                 .v_address = (void *)((unsigned long)KIBNAL_TX_VADDR(tx)),
1077                 .l_key     = KIBNAL_TX_LKEY(tx),
1078                 .length    = nob,
1079         };
1080
1081         memset(wrq, 0, sizeof(*wrq));
1082
1083         wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1084         wrq->wr_type = vv_wr_send;
1085         wrq->scatgat_list = gl;
1086         wrq->num_of_data_segments = 1;
1087         wrq->completion_notification = 1;
1088         wrq->type.send.solicited_event = 1;
1089         wrq->type.send.immidiate_data_indicator = 0;
1090         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1091         
1092         tx->tx_nwrq++;
1093 }
1094
1095 int
1096 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1097                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1098 {
1099         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1100         int              resid = nob;
1101         kib_msg_t       *ibmsg = tx->tx_msg;
1102         kib_rdma_desc_t *srcrd = tx->tx_rd;
1103         kib_rdma_frag_t *srcfrag;
1104         int              srcidx;
1105         kib_rdma_frag_t *dstfrag;
1106         int              dstidx;
1107         vv_scatgat_t    *gl;
1108         vv_wr_t         *wrq;
1109         int              wrknob;
1110         int              rc;
1111
1112         /* Called by scheduler */
1113         LASSERT (!in_interrupt());
1114
1115         LASSERT (type == IBNAL_MSG_GET_DONE ||
1116                  type == IBNAL_MSG_PUT_DONE);
1117
1118         srcidx = dstidx = 0;
1119         srcfrag = &srcrd->rd_frags[0];
1120         dstfrag = &dstrd->rd_frags[0];
1121         rc = resid;
1122
1123         while (resid > 0) {
1124                 if (srcidx >= srcrd->rd_nfrag) {
1125                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1126                         rc = -EPROTO;
1127                         break;
1128                 }
1129                 
1130                 if (dstidx == dstrd->rd_nfrag) {
1131                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1132                         rc = -EPROTO;
1133                         break;
1134                 }
1135
1136                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1137                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1138                                srcidx, srcrd->rd_nfrag,
1139                                dstidx, dstrd->rd_nfrag);
1140                         rc = -EMSGSIZE;
1141                         break;
1142                 }
1143
1144                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1145
1146                 gl = &tx->tx_gl[tx->tx_nwrq];
1147                 gl->v_address = (void *)((unsigned long)kibnal_rf_addr(srcfrag));
1148                 gl->length    = wrknob;
1149                 gl->l_key     = srcrd->rd_key;
1150
1151                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1152
1153                 wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1154                 wrq->completion_notification = 0;
1155                 wrq->scatgat_list = gl;
1156                 wrq->num_of_data_segments = 1;
1157                 wrq->wr_type = vv_wr_rdma_write;
1158                 wrq->type.send.solicited_event = 0;
1159                 wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1160                 wrq->type.send.send_qp_type.rc_type.r_addr = kibnal_rf_addr(dstfrag);
1161                 wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1162
1163                 resid -= wrknob;
1164                 if (wrknob < srcfrag->rf_nob) {
1165                         kibnal_rf_set(srcfrag, 
1166                                       kibnal_rf_addr(srcfrag) + resid, 
1167                                       srcfrag->rf_nob - wrknob);
1168                 } else {
1169                         srcfrag++;
1170                         srcidx++;
1171                 }
1172                 
1173                 if (wrknob < dstfrag->rf_nob) {
1174                         kibnal_rf_set(dstfrag,
1175                                       kibnal_rf_addr(dstfrag) + resid,
1176                                       dstfrag->rf_nob - wrknob);
1177                 } else {
1178                         dstfrag++;
1179                         dstidx++;
1180                 }
1181                 
1182                 tx->tx_nwrq++;
1183         }
1184
1185         if (rc < 0)                             /* no RDMA if completing with failure */
1186                 tx->tx_nwrq = 0;
1187         
1188         ibmsg->ibm_u.completion.ibcm_status = rc;
1189         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1190         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1191
1192         return rc;
1193 }
1194
1195 void
1196 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1197 {
1198         spin_lock(&conn->ibc_lock);
1199         kibnal_queue_tx_locked (tx, conn);
1200         spin_unlock(&conn->ibc_lock);
1201         
1202         kibnal_check_sends(conn);
1203 }
1204
1205 void
1206 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1207 {
1208         kib_peer_t      *peer;
1209         kib_conn_t      *conn;
1210         unsigned long    flags;
1211         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1212
1213         /* If I get here, I've committed to send, so I complete the tx with
1214          * failure on any problems */
1215         
1216         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1217         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1218
1219         read_lock_irqsave(g_lock, flags);
1220         
1221         peer = kibnal_find_peer_locked (nid);
1222         if (peer == NULL) {
1223                 read_unlock_irqrestore(g_lock, flags);
1224                 tx->tx_status = -EHOSTUNREACH;
1225                 kibnal_tx_done (tx);
1226                 return;
1227         }
1228
1229         conn = kibnal_find_conn_locked (peer);
1230         if (conn != NULL) {
1231                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1232                 read_unlock_irqrestore(g_lock, flags);
1233                 
1234                 kibnal_queue_tx (tx, conn);
1235                 kibnal_conn_decref(conn);       /* ...to here */
1236                 return;
1237         }
1238         
1239         /* Making one or more connections; I'll need a write lock... */
1240         read_unlock(g_lock);
1241         write_lock(g_lock);
1242
1243         peer = kibnal_find_peer_locked (nid);
1244         if (peer == NULL) {
1245                 write_unlock_irqrestore(g_lock, flags);
1246                 tx->tx_status = -EHOSTUNREACH;
1247                 kibnal_tx_done (tx);
1248                 return;
1249         }
1250
1251         conn = kibnal_find_conn_locked (peer);
1252         if (conn != NULL) {
1253                 /* Connection exists; queue message on it */
1254                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1255                 write_unlock_irqrestore(g_lock, flags);
1256                 
1257                 kibnal_queue_tx (tx, conn);
1258                 kibnal_conn_decref(conn);       /* ...until here */
1259                 return;
1260         }
1261
1262         if (peer->ibp_connecting == 0) {
1263                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1264                         write_unlock_irqrestore(g_lock, flags);
1265                         tx->tx_status = -EHOSTUNREACH;
1266                         kibnal_tx_done (tx);
1267                         return;
1268                 }
1269         
1270                 peer->ibp_connecting = 1;
1271                 kibnal_peer_addref(peer); /* extra ref for connd */
1272         
1273                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1274         
1275                 list_add_tail (&peer->ibp_connd_list,
1276                                &kibnal_data.kib_connd_peers);
1277                 wake_up (&kibnal_data.kib_connd_waitq);
1278         
1279                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1280         }
1281         
1282         /* A connection is being established; queue the message... */
1283         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1284
1285         write_unlock_irqrestore(g_lock, flags);
1286 }
1287
1288 int
1289 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
1290 {
1291         /* I would guess that if kibnal_get_peer (nid) == NULL,
1292            and we're not routing, then 'nid' is very distant :) */
1293         if ( nal->libnal_ni.ni_pid.nid == nid ) {
1294                 *dist = 0;
1295         } else {
1296                 *dist = 1;
1297         }
1298
1299         return 0;
1300 }
1301
1302 ptl_err_t
1303 kibnal_sendmsg(lib_nal_t    *nal, 
1304                void         *private,
1305                lib_msg_t    *libmsg,
1306                ptl_hdr_t    *hdr, 
1307                int           type, 
1308                ptl_nid_t     nid, 
1309                ptl_pid_t     pid,
1310                unsigned int  payload_niov, 
1311                struct iovec *payload_iov, 
1312                ptl_kiov_t   *payload_kiov,
1313                int           payload_offset,
1314                int           payload_nob)
1315 {
1316         kib_msg_t  *ibmsg;
1317         kib_tx_t   *tx;
1318         int         nob;
1319         int         rc;
1320         int         n;
1321
1322         /* NB 'private' is different depending on what we're sending.... */
1323
1324         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64
1325                " pid %d\n", payload_nob, payload_niov, nid , pid);
1326
1327         LASSERT (payload_nob == 0 || payload_niov > 0);
1328         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1329
1330         /* Thread context */
1331         LASSERT (!in_interrupt());
1332         /* payload is either all vaddrs or all pages */
1333         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1334
1335         switch (type) {
1336         default:
1337                 LBUG();
1338                 return (PTL_FAIL);
1339                 
1340         case PTL_MSG_REPLY: {
1341                 /* reply's 'private' is the incoming receive */
1342                 kib_rx_t *rx = private;
1343
1344                 LASSERT(rx != NULL);
1345
1346                 if (rx->rx_msg->ibm_type == IBNAL_MSG_IMMEDIATE) {
1347                         /* RDMA not expected */
1348                         nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1349                         if (nob > IBNAL_MSG_SIZE) {
1350                                 CERROR("REPLY for "LPX64" too big (RDMA not requested):"
1351                                        "%d (max for message is %d)\n", 
1352                                        nid, payload_nob, IBNAL_MSG_SIZE);
1353                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
1354                                        nob, nid);
1355                                 return PTL_FAIL;
1356                         }
1357                         break;
1358                 }
1359
1360                 /* Incoming message consistent with RDMA? */
1361                 if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
1362                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
1363                                nid, rx->rx_msg->ibm_type);
1364                         return PTL_FAIL;
1365                 }
1366
1367                 /* NB rx_complete() will send GET_NAK when I return to it from
1368                  * here, unless I set rx_responded! */
1369
1370                 tx = kibnal_get_idle_tx(0);
1371                 if (tx == NULL) {
1372                         CERROR("Can't get tx for REPLY to "LPX64"\n", nid);
1373                         return PTL_FAIL;
1374                 }
1375
1376                 if (payload_nob == 0)
1377                         rc = 0;
1378                 else if (payload_kiov == NULL)
1379                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
1380                                                  payload_niov, payload_iov, 
1381                                                  payload_offset, payload_nob);
1382                 else
1383                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1384                                                   payload_niov, payload_kiov,
1385                                                   payload_offset, payload_nob);
1386                 if (rc != 0) {
1387                         CERROR("Can't setup GET src for "LPX64": %d\n", nid, rc);
1388                         kibnal_tx_done(tx);
1389                         return PTL_FAIL;
1390                 }
1391                 
1392                 rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, payload_nob,
1393                                       &rx->rx_msg->ibm_u.get.ibgm_rd,
1394                                       rx->rx_msg->ibm_u.get.ibgm_cookie);
1395                 if (rc < 0) {
1396                         CERROR("Can't setup rdma for GET from "LPX64": %d\n", 
1397                                nid, rc);
1398                 } else if (rc == 0) {
1399                         /* No RDMA: local completion may happen now! */
1400                         lib_finalize (&kibnal_lib, NULL, libmsg, PTL_OK);
1401                 } else {
1402                         /* RDMA: lib_finalize(libmsg) when it completes */
1403                         tx->tx_libmsg[0] = libmsg;
1404                 }
1405
1406                 kibnal_queue_tx(tx, rx->rx_conn);
1407                 rx->rx_responded = 1;
1408                 return (rc >= 0) ? PTL_OK : PTL_FAIL;
1409         }
1410
1411         case PTL_MSG_GET:
1412                 /* will the REPLY message be small enough not to need RDMA? */
1413                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1414                 if (nob <= IBNAL_MSG_SIZE)
1415                         break;
1416
1417                 tx = kibnal_get_idle_tx(1);     /* may block; caller is an app thread */
1418                 LASSERT (tx != NULL);
1419
1420                 ibmsg = tx->tx_msg;
1421                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1422                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1423
1424                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
1425                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1426                                                  vv_acc_r_mem_write,
1427                                                  libmsg->md->md_niov,
1428                                                  libmsg->md->md_iov.iov,
1429                                                  0, libmsg->md->length);
1430                 else
1431                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1432                                                   vv_acc_r_mem_write,
1433                                                   libmsg->md->md_niov,
1434                                                   libmsg->md->md_iov.kiov,
1435                                                   0, libmsg->md->length);
1436                 if (rc != 0) {
1437                         CERROR("Can't setup GET sink for "LPX64": %d\n", nid, rc);
1438                         kibnal_tx_done(tx);
1439                         return PTL_FAIL;
1440                 }
1441
1442                 n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1443                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1444                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1445
1446                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, nid, libmsg);
1447                 if (tx->tx_libmsg[1] == NULL) {
1448                         CERROR("Can't create reply for GET -> "LPX64"\n", nid);
1449                         kibnal_tx_done(tx);
1450                         return PTL_FAIL;
1451                 }
1452
1453                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg[0,1] on completion */
1454                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1455                 kibnal_launch_tx(tx, nid);
1456                 return PTL_OK;
1457
1458         case PTL_MSG_ACK:
1459                 LASSERT (payload_nob == 0);
1460                 break;
1461
1462         case PTL_MSG_PUT:
1463                 /* Is the payload small enough not to need RDMA? */
1464                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1465                 if (nob <= IBNAL_MSG_SIZE)
1466                         break;
1467
1468                 tx = kibnal_get_idle_tx(1);     /* may block: caller is app thread */
1469                 LASSERT (tx != NULL);
1470
1471                 if (payload_kiov == NULL)
1472                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
1473                                                  payload_niov, payload_iov,
1474                                                  payload_offset, payload_nob);
1475                 else
1476                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1477                                                   payload_niov, payload_kiov,
1478                                                   payload_offset, payload_nob);
1479                 if (rc != 0) {
1480                         CERROR("Can't setup PUT src for "LPX64": %d\n", nid, rc);
1481                         kibnal_tx_done(tx);
1482                         return PTL_FAIL;
1483                 }
1484
1485                 ibmsg = tx->tx_msg;
1486                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1487                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1488                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1489
1490                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1491                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1492                 kibnal_launch_tx(tx, nid);
1493                 return PTL_OK;
1494         }
1495
1496         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1497                  <= IBNAL_MSG_SIZE);
1498
1499         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1500                                   type == PTL_MSG_REPLY));
1501         if (tx == NULL) {
1502                 CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", type, nid);
1503                 return PTL_NO_SPACE;
1504         }
1505
1506         ibmsg = tx->tx_msg;
1507         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1508
1509         if (payload_nob > 0) {
1510                 if (payload_kiov != NULL)
1511                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1512                                           payload_niov, payload_kiov,
1513                                           payload_offset, payload_nob);
1514                 else
1515                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1516                                          payload_niov, payload_iov,
1517                                          payload_offset, payload_nob);
1518         }
1519
1520         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1521         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1522
1523         tx->tx_libmsg[0] = libmsg;              /* finalise libmsg on completion */
1524         kibnal_launch_tx(tx, nid);
1525         return PTL_OK;
1526 }
1527
1528 ptl_err_t
1529 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1530                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1531                unsigned int payload_niov, struct iovec *payload_iov,
1532                size_t payload_offset, size_t payload_len)
1533 {
1534         CDEBUG(D_NET, "  pid = %d, nid="LPU64"\n",
1535                pid, nid);
1536         return (kibnal_sendmsg(nal, private, cookie,
1537                                hdr, type, nid, pid,
1538                                payload_niov, payload_iov, NULL,
1539                                payload_offset, payload_len));
1540 }
1541
1542 ptl_err_t
1543 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1544                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1545                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1546                      size_t payload_offset, size_t payload_len)
1547 {
1548         return (kibnal_sendmsg(nal, private, cookie,
1549                                hdr, type, nid, pid,
1550                                payload_niov, NULL, payload_kiov,
1551                                payload_offset, payload_len));
1552 }
1553
1554 ptl_err_t
1555 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1556                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1557                  size_t offset, int mlen, int rlen)
1558 {
1559         kib_rx_t    *rx = private;
1560         kib_msg_t   *rxmsg = rx->rx_msg;
1561         kib_conn_t  *conn = rx->rx_conn;
1562         kib_tx_t    *tx;
1563         kib_msg_t   *txmsg;
1564         int          nob;
1565         int          rc;
1566         int          n;
1567         
1568         LASSERT (mlen <= rlen);
1569         LASSERT (mlen >= 0);
1570         LASSERT (!in_interrupt());
1571         /* Either all pages or all vaddrs */
1572         LASSERT (!(kiov != NULL && iov != NULL));
1573
1574         switch (rxmsg->ibm_type) {
1575         default:
1576                 LBUG();
1577                 
1578         case IBNAL_MSG_IMMEDIATE:
1579                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1580                 if (nob > IBNAL_MSG_SIZE) {
1581                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1582                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1583                         return (PTL_FAIL);
1584                 }
1585
1586                 if (kiov != NULL)
1587                         lib_copy_buf2kiov(niov, kiov, offset,
1588                                           rxmsg->ibm_u.immediate.ibim_payload,
1589                                           mlen);
1590                 else
1591                         lib_copy_buf2iov(niov, iov, offset,
1592                                          rxmsg->ibm_u.immediate.ibim_payload,
1593                                          mlen);
1594
1595                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1596                 return (PTL_OK);
1597
1598         case IBNAL_MSG_PUT_REQ:
1599                 /* NB rx_complete() will send PUT_NAK when I return to it from
1600                  * here, unless I set rx_responded!  */
1601
1602                 if (mlen == 0) { /* No payload to RDMA */
1603                         lib_finalize(nal, NULL, libmsg, PTL_OK);
1604                         return PTL_OK;
1605                 }
1606
1607                 tx = kibnal_get_idle_tx(0);
1608                 if (tx == NULL) {
1609                         CERROR("Can't allocate tx for "LPX64"\n",
1610                                conn->ibc_peer->ibp_nid);
1611                         return PTL_FAIL;
1612                 }
1613
1614                 txmsg = tx->tx_msg;
1615                 if (kiov == NULL)
1616                         rc = kibnal_setup_rd_iov(tx, 
1617                                                  &txmsg->ibm_u.putack.ibpam_rd,
1618                                                  vv_acc_r_mem_write,
1619                                                  niov, iov, offset, mlen);
1620                 else
1621                         rc = kibnal_setup_rd_kiov(tx,
1622                                                   &txmsg->ibm_u.putack.ibpam_rd,
1623                                                   vv_acc_r_mem_write,
1624                                                   niov, kiov, offset, mlen);
1625                 if (rc != 0) {
1626                         CERROR("Can't setup PUT sink for "LPX64": %d\n",
1627                                conn->ibc_peer->ibp_nid, rc);
1628                         kibnal_tx_done(tx);
1629                         return PTL_FAIL;
1630                 }
1631
1632                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1633                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1634
1635                 n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1636                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1637                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1638
1639                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1640                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1641                 kibnal_queue_tx(tx, conn);
1642
1643                 LASSERT (!rx->rx_responded);
1644                 rx->rx_responded = 1;
1645                 return PTL_OK;
1646
1647         case IBNAL_MSG_GET_REQ:
1648                 /* We get called here just to discard any junk after the
1649                  * GET hdr. */
1650                 LASSERT (libmsg == NULL);
1651                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1652                 return (PTL_OK);
1653         }
1654 }
1655
1656 ptl_err_t
1657 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1658               unsigned int niov, struct iovec *iov, 
1659               size_t offset, size_t mlen, size_t rlen)
1660 {
1661         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1662                                 offset, mlen, rlen));
1663 }
1664
1665 ptl_err_t
1666 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1667                      unsigned int niov, ptl_kiov_t *kiov, 
1668                      size_t offset, size_t mlen, size_t rlen)
1669 {
1670         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1671                                 offset, mlen, rlen));
1672 }
1673
1674 int
1675 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1676 {
1677         long    pid = kernel_thread (fn, arg, 0);
1678
1679         if (pid < 0)
1680                 return ((int)pid);
1681
1682         atomic_inc (&kibnal_data.kib_nthreads);
1683         return (0);
1684 }
1685
1686 void
1687 kibnal_thread_fini (void)
1688 {
1689         atomic_dec (&kibnal_data.kib_nthreads);
1690 }
1691
1692 void
1693 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1694 {
1695         /* This just does the immmediate housekeeping.  'error' is zero for a
1696          * normal shutdown which can happen only after the connection has been
1697          * established.  If the connection is established, schedule the
1698          * connection to be finished off by the connd.  Otherwise the connd is
1699          * already dealing with it (either to set it up or tear it down).
1700          * Caller holds kib_global_lock exclusively in irq context */
1701         kib_peer_t   *peer = conn->ibc_peer;
1702
1703         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1704
1705         if (error != 0 && conn->ibc_comms_error == 0)
1706                 conn->ibc_comms_error = error;
1707
1708         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1709                 return; /* already being handled  */
1710
1711         CDEBUG (error == 0 ? D_NET : D_ERROR,
1712                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1713
1714         /* connd takes ibc_list's ref */
1715         list_del (&conn->ibc_list);
1716         
1717         if (list_empty (&peer->ibp_conns) &&
1718             peer->ibp_persistence == 0) {
1719                 /* Non-persistent peer with no more conns... */
1720                 kibnal_unlink_peer_locked (peer);
1721         }
1722
1723         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECT1);
1724
1725         spin_lock(&kibnal_data.kib_connd_lock);
1726
1727         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1728         wake_up (&kibnal_data.kib_connd_waitq);
1729                 
1730         spin_unlock(&kibnal_data.kib_connd_lock);
1731 }
1732
1733 void
1734 kibnal_close_conn (kib_conn_t *conn, int error)
1735 {
1736         unsigned long flags;
1737         
1738         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1739
1740         kibnal_close_conn_locked (conn, error);
1741         
1742         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1743 }
1744
1745 void
1746 kibnal_handle_early_rxs(kib_conn_t *conn)
1747 {
1748         unsigned long    flags;
1749         kib_rx_t        *rx;
1750
1751         LASSERT (!in_interrupt());
1752         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1753         
1754         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1755         while (!list_empty(&conn->ibc_early_rxs)) {
1756                 rx = list_entry(conn->ibc_early_rxs.next,
1757                                 kib_rx_t, rx_list);
1758                 list_del(&rx->rx_list);
1759                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1760                 
1761                 kibnal_handle_rx(rx);
1762                 
1763                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1764         }
1765         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1766 }
1767
1768 void
1769 kibnal_conn_disconnected(kib_conn_t *conn)
1770 {
1771         LIST_HEAD        (zombies); 
1772         struct list_head *tmp;
1773         struct list_head *nxt;
1774         kib_tx_t         *tx;
1775
1776         /* I'm the connd */
1777         LASSERT (!in_interrupt());
1778         LASSERT (current == kibnal_data.kib_connd);
1779         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
1780         
1781         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
1782
1783         /* move QP to error state to make posted work items complete */
1784         kibnal_set_qp_state(conn, vv_qp_state_error);
1785
1786         spin_lock(&conn->ibc_lock);
1787
1788         /* Complete all tx descs not waiting for sends to complete.
1789          * NB we should be safe from RDMA now that the QP has changed state */
1790
1791         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1792                 tx = list_entry (tmp, kib_tx_t, tx_list);
1793
1794                 tx->tx_status = -ECONNABORTED;
1795                 tx->tx_waiting = 0;
1796                 
1797                 if (tx->tx_sending != 0)
1798                         continue;
1799
1800                 list_del (&tx->tx_list);
1801                 list_add (&tx->tx_list, &zombies);
1802         }
1803
1804         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1805                 tx = list_entry (tmp, kib_tx_t, tx_list);
1806
1807                 LASSERT (tx->tx_waiting ||
1808                          tx->tx_sending != 0);
1809
1810                 tx->tx_status = -ECONNABORTED;
1811                 tx->tx_waiting = 0;
1812                 
1813                 if (tx->tx_sending != 0)
1814                         continue;
1815
1816                 list_del (&tx->tx_list);
1817                 list_add (&tx->tx_list, &zombies);
1818         }
1819         
1820         spin_unlock(&conn->ibc_lock);
1821
1822         while (!list_empty(&zombies)) {
1823                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1824
1825                 list_del(&tx->tx_list);
1826                 kibnal_tx_done (tx);
1827         }
1828
1829         kibnal_handle_early_rxs(conn);
1830 }
1831
1832 void
1833 kibnal_peer_connect_failed (kib_peer_t *peer, int active)
1834 {
1835         struct list_head  zombies;
1836         kib_tx_t         *tx;
1837         unsigned long     flags;
1838
1839         /* Only the connd creates conns => single threaded */
1840         LASSERT (!in_interrupt());
1841         LASSERT (current == kibnal_data.kib_connd);
1842         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1843
1844         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1845
1846         if (active) {
1847                 LASSERT (peer->ibp_connecting != 0);
1848                 peer->ibp_connecting--;
1849         } else {
1850                 LASSERT (!kibnal_peer_active(peer));
1851         }
1852         
1853         if (peer->ibp_connecting != 0) {
1854                 /* another connection attempt under way (loopback?)... */
1855                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1856                 return;
1857         }
1858
1859         if (list_empty(&peer->ibp_conns)) {
1860                 /* Say when active connection can be re-attempted */
1861                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1862                 /* Increase reconnection interval */
1863                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1864                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1865         
1866                 /* Take peer's blocked transmits to complete with error */
1867                 list_add(&zombies, &peer->ibp_tx_queue);
1868                 list_del_init(&peer->ibp_tx_queue);
1869                 
1870                 if (kibnal_peer_active(peer) &&
1871                     (peer->ibp_persistence == 0)) {
1872                         /* failed connection attempt on non-persistent peer */
1873                         kibnal_unlink_peer_locked (peer);
1874                 }
1875         } else {
1876                 /* Can't have blocked transmits if there are connections */
1877                 LASSERT (list_empty(&peer->ibp_tx_queue));
1878         }
1879         
1880         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1881
1882         if (list_empty (&zombies)) 
1883                 return;
1884         
1885         CERROR ("Deleting messages for "LPX64": connection failed\n", peer->ibp_nid);
1886         do {
1887                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1888
1889                 list_del (&tx->tx_list);
1890                 /* complete now */
1891                 tx->tx_status = -EHOSTUNREACH;
1892                 kibnal_tx_done (tx);
1893         } while (!list_empty (&zombies));
1894 }
1895
1896 void
1897 kibnal_connreq_done(kib_conn_t *conn, int active, int status)
1898 {
1899         static cm_reject_data_t   rej;
1900
1901         struct list_head   txs;
1902         kib_peer_t        *peer = conn->ibc_peer;
1903         kib_peer_t        *peer2;
1904         unsigned long      flags;
1905         kib_tx_t          *tx;
1906
1907         /* Only the connd creates conns => single threaded */
1908         LASSERT (!in_interrupt());
1909         LASSERT (current == kibnal_data.kib_connd);
1910         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
1911
1912         if (active) {
1913                 LASSERT (peer->ibp_connecting > 0);
1914         } else {
1915                 LASSERT (!kibnal_peer_active(peer));
1916         }
1917         
1918         PORTAL_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1919         conn->ibc_connvars = NULL;
1920
1921         if (status != 0) {
1922                 /* failed to establish connection */
1923                 switch (conn->ibc_state) {
1924                 default:
1925                         LBUG();
1926                 case IBNAL_CONN_ACTIVE_CHECK_REPLY:
1927                         /* got a connection reply but failed checks */
1928                         LASSERT (active);
1929                         memset(&rej, 0, sizeof(rej));
1930                         rej.reason = cm_rej_code_usr_rej;
1931                         cm_reject(conn->ibc_cep, &rej);
1932                         break;
1933
1934                 case IBNAL_CONN_ACTIVE_CONNECT:
1935                         LASSERT (active);
1936                         cm_cancel(conn->ibc_cep);
1937                         kibnal_pause(HZ/10);
1938                         /* cm_connect() failed immediately or
1939                          * callback returned failure */
1940                         break;
1941
1942                 case IBNAL_CONN_ACTIVE_ARP:
1943                         LASSERT (active);
1944                         /* ibat_get_ib_data() failed immediately 
1945                          * or callback returned failure */
1946                         break;
1947
1948                 case IBNAL_CONN_INIT:
1949                         break;
1950
1951                 case IBNAL_CONN_PASSIVE_WAIT:
1952                         LASSERT (!active);
1953                         /* cm_accept callback returned failure */
1954                         break;
1955                 }
1956
1957                 kibnal_peer_connect_failed(conn->ibc_peer, active);
1958                 kibnal_conn_disconnected(conn);
1959                 return;
1960         }
1961
1962         /* connection established */
1963         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1964
1965         if (active) {
1966                 LASSERT(conn->ibc_state == IBNAL_CONN_ACTIVE_RTU);
1967         } else {
1968                 LASSERT(conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
1969         }
1970         
1971         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
1972
1973         if (!active) {
1974                 peer2 = kibnal_find_peer_locked(peer->ibp_nid);
1975                 if (peer2 != NULL) {
1976                         /* already in the peer table; swap */
1977                         conn->ibc_peer = peer2;
1978                         kibnal_peer_addref(peer2);
1979                         kibnal_peer_decref(peer);
1980                         peer = conn->ibc_peer;
1981                 } else {
1982                         /* add 'peer' to the peer table */
1983                         kibnal_peer_addref(peer);
1984                         list_add_tail(&peer->ibp_list,
1985                                       kibnal_nid2peerlist(peer->ibp_nid));
1986                 }
1987         }
1988         
1989         /* Add conn to peer's list and nuke any dangling conns from a different
1990          * peer instance... */
1991         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
1992         list_add(&conn->ibc_list, &peer->ibp_conns);
1993         kibnal_close_stale_conns_locked (conn->ibc_peer,
1994                                          conn->ibc_incarnation);
1995
1996         if (!kibnal_peer_active(peer) ||        /* peer has been deleted */
1997             conn->ibc_comms_error != 0 ||       /* comms error */
1998             conn->ibc_disconnect) {             /* need to disconnect */
1999                 
2000                 /* start to shut down connection */
2001                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2002
2003                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2004                 kibnal_peer_connect_failed(peer, active);
2005                 return;
2006         }
2007
2008         if (active)
2009                 peer->ibp_connecting--;
2010
2011         /* grab pending txs while I have the lock */
2012         list_add(&txs, &peer->ibp_tx_queue);
2013         list_del_init(&peer->ibp_tx_queue);
2014         
2015         /* reset reconnect interval for next attempt */
2016         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
2017         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2018
2019         /* Schedule blocked txs */
2020         spin_lock (&conn->ibc_lock);
2021         while (!list_empty (&txs)) {
2022                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2023                 list_del (&tx->tx_list);
2024
2025                 kibnal_queue_tx_locked (tx, conn);
2026         }
2027         spin_unlock (&conn->ibc_lock);
2028         kibnal_check_sends (conn);
2029
2030         /* schedule blocked rxs */
2031         kibnal_handle_early_rxs(conn);
2032 }
2033
2034 void
2035 kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
2036 {
2037         static cm_dreply_data_t drep;           /* just zeroed space */
2038         
2039         kib_conn_t             *conn = (kib_conn_t *)arg;
2040         unsigned long           flags;
2041         
2042         /* CAVEAT EMPTOR: tasklet context */
2043
2044         switch (cmdata->status) {
2045         default:
2046                 LBUG();
2047                 
2048         case cm_event_disconn_request:
2049                 /* IBNAL_CONN_ACTIVE_RTU:  gets closed in kibnal_connreq_done
2050                  * IBNAL_CONN_ESTABLISHED: I start it closing
2051                  * otherwise:              it's closing anyway */
2052                 cm_disconnect(conn->ibc_cep, NULL, &drep);
2053                 cm_cancel(conn->ibc_cep);
2054
2055                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2056                 LASSERT (!conn->ibc_disconnect);
2057                 conn->ibc_disconnect = 1;
2058
2059                 switch (conn->ibc_state) {
2060                 default:
2061                         LBUG();
2062
2063                 case IBNAL_CONN_ACTIVE_RTU:
2064                         /* kibnal_connreq_done is getting there; It'll see
2065                          * ibc_disconnect set... */
2066                         kibnal_conn_decref(conn); /* lose my ref */
2067                         break;
2068
2069                 case IBNAL_CONN_ESTABLISHED:
2070                         /* kibnal_connreq_done got there already; get
2071                          * disconnect going... */
2072                         kibnal_close_conn_locked(conn, 0);
2073                         kibnal_conn_decref(conn); /* lose my ref */
2074                         break;
2075
2076                 case IBNAL_CONN_DISCONNECT1:
2077                         /* kibnal_terminate_conn is getting there; It'll see
2078                          * ibc_disconnect set... */
2079                         kibnal_conn_decref(conn); /* lose my ref */
2080                         break;
2081
2082                 case IBNAL_CONN_DISCONNECT2:
2083                         /* kibnal_terminate_conn got there already; complete
2084                          * the disconnect.  NB kib_connd_conns takes my ref */
2085                         spin_lock(&kibnal_data.kib_connd_lock);
2086                         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2087                         wake_up(&kibnal_data.kib_connd_waitq);
2088                         spin_unlock(&kibnal_data.kib_connd_lock);
2089                         break;
2090                 }
2091                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2092                 return;
2093                 
2094         case cm_event_disconn_timeout:
2095         case cm_event_disconn_reply:
2096                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2097                 LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT2);
2098                 LASSERT (!conn->ibc_disconnect);
2099                 conn->ibc_disconnect = 1;
2100
2101                 /* kibnal_terminate_conn sent the disconnect request.  
2102                  * NB kib_connd_conns takes my ref */
2103                 spin_lock(&kibnal_data.kib_connd_lock);
2104                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2105                 wake_up(&kibnal_data.kib_connd_waitq);
2106                 spin_unlock(&kibnal_data.kib_connd_lock);
2107
2108                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2109                 break;
2110                 
2111         case cm_event_connected:
2112         case cm_event_conn_timeout:
2113         case cm_event_conn_reject:
2114                 LASSERT (conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2115                 conn->ibc_connvars->cv_conndata = *cmdata;
2116                 
2117                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2118                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2119                 wake_up(&kibnal_data.kib_connd_waitq);
2120                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2121                 break;
2122         }
2123 }
2124
2125 void
2126 kibnal_check_passive_wait(kib_conn_t *conn)
2127 {
2128         int     rc;
2129
2130         switch (conn->ibc_connvars->cv_conndata.status) {
2131         default:
2132                 LBUG();
2133                 
2134         case cm_event_connected:
2135                 kibnal_conn_addref(conn); /* ++ ref for CM callback */
2136                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2137                 if (rc != 0)
2138                         conn->ibc_comms_error = rc;
2139                 /* connection _has_ been established; it's just that we've had
2140                  * an error immediately... */
2141                 kibnal_connreq_done(conn, 0, 0);
2142                 break;
2143                 
2144         case cm_event_conn_timeout:
2145                 kibnal_connreq_done(conn, 0, -ETIMEDOUT);
2146                 break;
2147                 
2148         case cm_event_conn_reject:
2149                 kibnal_connreq_done(conn, 0, -ECONNRESET);
2150                 break;
2151         }
2152 }
2153
2154 void
2155 kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
2156 {
2157         static cm_reply_data_t  reply;
2158         static cm_reject_data_t reject;
2159
2160         kib_msg_t          *rxmsg = (kib_msg_t *)cmreq->priv_data;
2161         kib_msg_t          *txmsg;
2162         kib_conn_t         *conn = NULL;
2163         int                 rc = 0;
2164         kib_connvars_t     *cv;
2165         kib_peer_t         *tmp_peer;
2166         cm_return_t         cmrc;
2167         vv_return_t         vvrc;
2168         
2169         /* I'm the connd executing in thread context
2170          * No concurrency problems with static data! */
2171         LASSERT (!in_interrupt());
2172         LASSERT (current == kibnal_data.kib_connd);
2173
2174         if (cmreq->sid != IBNAL_SERVICE_NUMBER) {
2175                 CERROR(LPX64" != IBNAL_SERVICE_NUMBER("LPX64")\n",
2176                        cmreq->sid, (__u64)IBNAL_SERVICE_NUMBER);
2177                 goto reject;
2178         }
2179
2180         rc = kibnal_unpack_msg(rxmsg, cm_REQ_priv_data_len);
2181         if (rc != 0) {
2182                 CERROR("Can't parse connection request: %d\n", rc);
2183                 goto reject;
2184         }
2185
2186         if (rxmsg->ibm_type != IBNAL_MSG_CONNREQ) {
2187                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
2188                        rxmsg->ibm_type, rxmsg->ibm_srcnid);
2189                 goto reject;
2190         }
2191
2192         if (rxmsg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) {
2193                 CERROR("Can't accept "LPX64": bad dst nid "LPX64"\n",
2194                        rxmsg->ibm_srcnid, rxmsg->ibm_dstnid);
2195                 goto reject;
2196         }
2197
2198         if (rxmsg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2199                 CERROR("Can't accept "LPX64": incompatible queue depth %d (%d wanted)\n",
2200                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_queue_depth, 
2201                        IBNAL_MSG_QUEUE_SIZE);
2202                 goto reject;
2203         }
2204
2205         if (rxmsg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2206                 CERROR("Can't accept "LPX64": message size %d too big (%d max)\n",
2207                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_msg_size, 
2208                        IBNAL_MSG_SIZE);
2209                 goto reject;
2210         }
2211                 
2212         if (rxmsg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2213                 CERROR("Can't accept "LPX64": max frags %d too big (%d max)\n",
2214                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_frags, 
2215                        IBNAL_MAX_RDMA_FRAGS);
2216                 goto reject;
2217         }
2218                 
2219         conn = kibnal_create_conn(cep);
2220         if (conn == NULL) {
2221                 CERROR("Can't create conn for "LPX64"\n", rxmsg->ibm_srcnid);
2222                 goto reject;
2223         }
2224         
2225         /* assume 'rxmsg->ibm_srcnid' is a new peer */
2226         tmp_peer = kibnal_create_peer (rxmsg->ibm_srcnid);
2227         if (tmp_peer == NULL) {
2228                 CERROR("Can't create tmp peer for "LPX64"\n", rxmsg->ibm_srcnid);
2229                 kibnal_conn_decref(conn);
2230                 conn = NULL;
2231                 goto reject;
2232         }
2233
2234         conn->ibc_peer = tmp_peer;              /* conn takes over my ref */
2235         conn->ibc_incarnation = rxmsg->ibm_srcstamp;
2236         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2237
2238         cv = conn->ibc_connvars;
2239
2240         cv->cv_txpsn          = cmreq->cep_data.start_psn;
2241         cv->cv_remote_qpn     = cmreq->cep_data.qpn;
2242         cv->cv_path           = cmreq->path_data.path;
2243         cv->cv_rnr_count      = cmreq->cep_data.rtr_retry_cnt;
2244         // XXX                  cmreq->cep_data.retry_cnt;
2245         cv->cv_port           = cmreq->cep_data.local_port_num;
2246
2247         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2248                              &cv->cv_path.sgid, &cv->cv_sgid_index);
2249         LASSERT (vvrc == vv_return_ok);
2250         
2251         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2252                                cv->cv_path.pkey, &cv->cv_pkey_index);
2253         LASSERT (vvrc == vv_return_ok);
2254
2255         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2256         if (rc != 0)
2257                 goto reject;
2258
2259         rc = kibnal_post_receives(conn);
2260         if (rc != 0) {
2261                 CERROR("Can't post receives for "LPX64"\n", rxmsg->ibm_srcnid);
2262                 goto reject;
2263         }
2264
2265         rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2266         if (rc != 0)
2267                 goto reject;
2268         
2269         memset(&reply, 0, sizeof(reply));
2270         reply.qpn                 = cv->cv_local_qpn;
2271         reply.qkey                = IBNAL_QKEY;
2272         reply.start_psn           = cv->cv_rxpsn;
2273         reply.arb_initiator_depth = IBNAL_ARB_INITIATOR_DEPTH;
2274         reply.arb_resp_res        = IBNAL_ARB_RESP_RES;
2275         reply.failover_accepted   = IBNAL_FAILOVER_ACCEPTED;
2276         reply.rnr_retry_count     = cv->cv_rnr_count;
2277         reply.targ_ack_delay      = kibnal_data.kib_hca_attrs.ack_delay;
2278         
2279         txmsg = (kib_msg_t *)&reply.priv_data;
2280         kibnal_init_msg(txmsg, IBNAL_MSG_CONNACK, 
2281                         sizeof(txmsg->ibm_u.connparams));
2282         LASSERT (txmsg->ibm_nob <= cm_REP_priv_data_len);
2283         txmsg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2284         txmsg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2285         txmsg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2286         kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp);
2287         
2288         kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
2289         
2290         cmrc = cm_accept(conn->ibc_cep, &reply, NULL,
2291                          kibnal_cm_callback, conn);
2292
2293         if (cmrc == cm_stat_success)
2294                 return;                         /* callback has got my ref on conn */
2295
2296         /* back out state change (no callback happening) */
2297         kibnal_set_conn_state(conn, IBNAL_CONN_INIT);
2298         rc = -EIO;
2299                 
2300  reject:
2301         CERROR("Rejected connreq from "LPX64"\n", rxmsg->ibm_srcnid);
2302
2303         memset(&reject, 0, sizeof(reject));
2304         reject.reason = cm_rej_code_usr_rej;
2305         cm_reject(cep, &reject);
2306
2307         if (conn != NULL) {
2308                 LASSERT (rc != 0);
2309                 kibnal_connreq_done(conn, 0, rc);
2310         } else {
2311                 cm_destroy_cep(cep);
2312         }
2313 }
2314
2315 void
2316 kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *data, void *arg)
2317 {
2318         cm_request_data_t  *cmreq = &data->data.request;
2319         kib_pcreq_t        *pcr;
2320         unsigned long       flags;
2321         
2322         LASSERT (arg == NULL);
2323
2324         if (data->status != cm_event_conn_request) {
2325                 CERROR("status %d is not cm_event_conn_request\n",
2326                        data->status);
2327                 return;
2328         }
2329
2330         PORTAL_ALLOC_ATOMIC(pcr, sizeof(*pcr));
2331         if (pcr == NULL) {
2332                 CERROR("Can't allocate passive connreq\n");
2333
2334                 cm_reject(cep, &((cm_reject_data_t) /* NB RO struct */
2335                                  {.reason = cm_rej_code_no_res,}));
2336                 cm_destroy_cep(cep);
2337                 return;
2338         }
2339
2340         pcr->pcr_cep = cep;
2341         pcr->pcr_cmreq = *cmreq;
2342         
2343         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2344
2345         list_add_tail(&pcr->pcr_list, &kibnal_data.kib_connd_pcreqs);
2346         wake_up(&kibnal_data.kib_connd_waitq);
2347         
2348         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2349 }
2350
2351
2352 void
2353 kibnal_active_connect_callback (cm_cep_handle_t cep, cm_conn_data_t *cd, 
2354                                 void *arg)
2355 {
2356         /* CAVEAT EMPTOR: tasklet context */
2357         kib_conn_t       *conn = (kib_conn_t *)arg;
2358         kib_connvars_t   *cv = conn->ibc_connvars;
2359         unsigned long     flags;
2360
2361         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2362         cv->cv_conndata = *cd;
2363
2364         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2365         /* connd takes my ref */
2366         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2367         wake_up(&kibnal_data.kib_connd_waitq);
2368         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2369 }
2370
2371 void
2372 kibnal_connect_conn (kib_conn_t *conn)
2373 {
2374         static cm_request_data_t  cmreq;
2375         kib_msg_t                *msg = (kib_msg_t *)&cmreq.priv_data;
2376         kib_connvars_t           *cv = conn->ibc_connvars;
2377         kib_peer_t               *peer = conn->ibc_peer;
2378         cm_return_t               cmrc;
2379         
2380         /* Only called by connd => statics OK */
2381         LASSERT (!in_interrupt());
2382         LASSERT (current == kibnal_data.kib_connd);
2383         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2384
2385         memset(&cmreq, 0, sizeof(cmreq));
2386         
2387         cmreq.sid = IBNAL_SERVICE_NUMBER;
2388
2389         cmreq.cep_data.ca_guid              = kibnal_data.kib_hca_attrs.guid;
2390         cmreq.cep_data.qpn                  = cv->cv_local_qpn;
2391         cmreq.cep_data.retry_cnt            = IBNAL_RETRY_CNT;
2392         cmreq.cep_data.rtr_retry_cnt        = IBNAL_RNR_CNT;
2393         cmreq.cep_data.start_psn            = cv->cv_rxpsn;
2394         cmreq.cep_data.end_to_end_flow_ctrl = IBNAL_EE_FLOW_CNT;
2395         // XXX ack_timeout?
2396         // offered_resp_res
2397         // offered_initiator_depth
2398
2399         cmreq.path_data.subn_local  = IBNAL_LOCAL_SUB;
2400         cmreq.path_data.path        = cv->cv_path;
2401         
2402         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2403         LASSERT(msg->ibm_nob <= cm_REQ_priv_data_len);
2404         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2405         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2406         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2407         kibnal_pack_msg(msg, 0, peer->ibp_nid, 0);
2408         
2409         CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
2410
2411         kibnal_conn_addref(conn);               /* ++ref for CM callback */
2412         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CONNECT);
2413
2414         cmrc = cm_connect(conn->ibc_cep, &cmreq, 
2415                           kibnal_active_connect_callback, conn);
2416         if (cmrc == cm_stat_success) {
2417                 CDEBUG(D_NET, "connection REQ sent to "LPX64"\n",
2418                        peer->ibp_nid);
2419                 return;
2420         }
2421
2422         CERROR ("Connect "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2423         kibnal_conn_decref(conn);       /* drop callback's ref */
2424         kibnal_connreq_done(conn, 1, -EHOSTUNREACH);
2425 }
2426
2427 void
2428 kibnal_check_connreply (kib_conn_t *conn)
2429 {
2430         static cm_rtu_data_t  rtu;
2431
2432         kib_connvars_t   *cv = conn->ibc_connvars;
2433         cm_reply_data_t  *reply = &cv->cv_conndata.data.reply;
2434         kib_msg_t        *msg = (kib_msg_t *)&reply->priv_data;
2435         kib_peer_t       *peer = conn->ibc_peer;
2436         cm_return_t       cmrc;
2437         cm_cep_handle_t   cep;
2438         unsigned long     flags;
2439         int               rc;
2440
2441         /* Only called by connd => statics OK */
2442         LASSERT (!in_interrupt());
2443         LASSERT (current == kibnal_data.kib_connd);
2444         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2445
2446         if (cv->cv_conndata.status == cm_event_conn_reply) {
2447                 cv->cv_remote_qpn = reply->qpn;
2448                 cv->cv_txpsn      = reply->start_psn;
2449                 // XXX              reply->targ_ack_delay;
2450                 cv->cv_rnr_count  = reply->rnr_retry_count;
2451
2452                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2453
2454                 rc = kibnal_unpack_msg(msg, cm_REP_priv_data_len);
2455                 if (rc != 0) {
2456                         CERROR("Can't unpack reply from "LPX64"\n",
2457                                peer->ibp_nid);
2458                         kibnal_connreq_done(conn, 1, rc);
2459                         return;
2460                 }
2461
2462                 if (msg->ibm_type != IBNAL_MSG_CONNACK ) {
2463                         CERROR("Unexpected message type %d from "LPX64"\n",
2464                                msg->ibm_type, peer->ibp_nid);
2465                         kibnal_connreq_done(conn, 1, -EPROTO);
2466                         return;
2467                 }
2468
2469                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2470                         CERROR(LPX64" has incompatible queue depth %d(%d wanted)\n",
2471                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_queue_depth,
2472                                IBNAL_MSG_QUEUE_SIZE);
2473                         kibnal_connreq_done(conn, 1, -EPROTO);
2474                         return;
2475                 }
2476                 
2477                 if (msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2478                         CERROR(LPX64" max message size %d too big (%d max)\n",
2479                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_msg_size, 
2480                                IBNAL_MSG_SIZE);
2481                         kibnal_connreq_done(conn, 1, -EPROTO);
2482                         return;
2483                 }
2484
2485                 if (msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2486                         CERROR(LPX64" max frags %d too big (%d max)\n",
2487                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_frags, 
2488                                IBNAL_MAX_RDMA_FRAGS);
2489                         kibnal_connreq_done(conn, 1, -EPROTO);
2490                         return;
2491                 }
2492                 
2493                 read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2494                 rc = (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2495                       msg->ibm_dststamp != kibnal_data.kib_incarnation) ?
2496                      -ESTALE : 0;
2497                 read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2498                 if (rc != 0) {
2499                         CERROR("Stale connection reply from "LPX64"\n",
2500                                peer->ibp_nid);
2501                         kibnal_connreq_done(conn, 1, rc);
2502                         return;
2503                 }
2504
2505                 conn->ibc_incarnation = msg->ibm_srcstamp;
2506                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2507                 
2508                 rc = kibnal_post_receives(conn);
2509                 if (rc != 0) {
2510                         CERROR("Can't post receives for "LPX64"\n",
2511                                peer->ibp_nid);
2512                         kibnal_connreq_done(conn, 1, rc);
2513                         return;
2514                 }
2515                 
2516                 rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2517                 if (rc != 0) {
2518                         kibnal_connreq_done(conn, 1, rc);
2519                         return;
2520                 }
2521                 
2522                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2523                 if (rc != 0) {
2524                         kibnal_connreq_done(conn, 1, rc);
2525                         return;
2526                 }
2527                 
2528                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_RTU);
2529                 kibnal_conn_addref(conn);       /* ++for CM callback */
2530                 
2531                 memset(&rtu, 0, sizeof(rtu));
2532                 cmrc = cm_accept(conn->ibc_cep, NULL, &rtu,
2533                                  kibnal_cm_callback, conn);
2534                 if (cmrc == cm_stat_success) {
2535                         /* Now I'm racing with disconnect signalled by
2536                          * kibnal_cm_callback */
2537                         kibnal_connreq_done(conn, 1, 0);
2538                         return;
2539                 }
2540
2541                 CERROR("cm_accept "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2542                 /* Back out of RTU: no callback coming */
2543                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2544                 kibnal_conn_decref(conn);
2545                 kibnal_connreq_done(conn, 1, -EIO);
2546                 return;
2547         }
2548
2549         if (cv->cv_conndata.status == cm_event_conn_reject) {
2550
2551                 if (cv->cv_conndata.data.reject.reason != cm_rej_code_stale_conn) {
2552                         CERROR("conn -> "LPX64" rejected: %d\n", peer->ibp_nid,
2553                                cv->cv_conndata.data.reject.reason);
2554                         kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2555                         return;
2556                 }
2557
2558                 CWARN ("conn -> "LPX64" stale: retrying\n", peer->ibp_nid);
2559
2560                 cep = cm_create_cep(cm_cep_transp_rc);
2561                 if (cep == NULL) {
2562                         CERROR("Can't create new CEP\n");
2563                         kibnal_connreq_done(conn, 1, -ENOMEM);
2564                         return;
2565                 }
2566
2567                 cmrc = cm_cancel(conn->ibc_cep);
2568                 LASSERT (cmrc == cm_stat_success);
2569                 cmrc = cm_destroy_cep(conn->ibc_cep);
2570                 LASSERT (cmrc == cm_stat_success);
2571
2572                 conn->ibc_cep = cep;
2573
2574                 /* retry connect */
2575                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2576                 kibnal_connect_conn(conn);
2577                 return;
2578         }
2579
2580         CERROR("conn -> "LPX64" failed: %d\n", peer->ibp_nid,
2581                cv->cv_conndata.status);
2582         kibnal_connreq_done(conn, 1, -ECONNABORTED);
2583 }
2584
2585 void
2586 kibnal_send_connreq (kib_conn_t *conn)
2587 {
2588         kib_peer_t           *peer = conn->ibc_peer;
2589         kib_connvars_t       *cv = conn->ibc_connvars;
2590         ibat_arp_data_t      *arp = &cv->cv_arp;
2591         ib_path_record_v2_t  *path = &cv->cv_path;
2592         vv_return_t           vvrc;
2593         int                   rc;
2594
2595         /* Only called by connd => statics OK */
2596         LASSERT (!in_interrupt());
2597         LASSERT (current == kibnal_data.kib_connd);
2598         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2599         
2600         if (cv->cv_arprc != ibat_stat_ok) {
2601                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: %d\n", peer->ibp_nid,
2602                        HIPQUAD(peer->ibp_ip), cv->cv_arprc);
2603                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2604                 return;
2605         }
2606
2607         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
2608                 CDEBUG(D_NET, "Got valid path for "LPX64"\n", peer->ibp_nid);
2609
2610                 *path = *arp->primary_path;
2611
2612                 vvrc = base_gid2port_num(kibnal_data.kib_hca, &path->sgid,
2613                                          &cv->cv_port);
2614                 LASSERT (vvrc == vv_return_ok);
2615
2616                 vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2617                                      &path->sgid, &cv->cv_sgid_index);
2618                 LASSERT (vvrc == vv_return_ok);
2619
2620                 vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2621                                        path->pkey, &cv->cv_pkey_index);
2622                 LASSERT (vvrc == vv_return_ok);
2623
2624                 path->mtu = IBNAL_IB_MTU;
2625
2626         } else if ((arp->mask & IBAT_LID_VALID) != 0) {
2627                 CWARN("Creating new path record for "LPX64"@%u.%u.%u.%u\n",
2628                       peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2629
2630                 cv->cv_pkey_index = IBNAL_PKEY_IDX;
2631                 cv->cv_sgid_index = IBNAL_SGID_IDX;
2632                 cv->cv_port = arp->local_port_num;
2633
2634                 memset(path, 0, sizeof(*path));
2635
2636                 vvrc = port_num2base_gid(kibnal_data.kib_hca, cv->cv_port,
2637                                          &path->sgid);
2638                 LASSERT (vvrc == vv_return_ok);
2639
2640                 vvrc = port_num2base_lid(kibnal_data.kib_hca, cv->cv_port,
2641                                          &path->slid);
2642                 LASSERT (vvrc == vv_return_ok);
2643
2644                 path->dgid          = arp->gid;
2645                 path->sl            = IBNAL_SERVICE_LEVEL;
2646                 path->dlid          = arp->lid;
2647                 path->mtu           = IBNAL_IB_MTU;
2648                 path->rate          = IBNAL_STATIC_RATE;
2649                 path->pkt_life_time = IBNAL_PKT_LIFETIME;
2650                 path->pkey          = IBNAL_PKEY;
2651                 path->traffic_class = IBNAL_TRAFFIC_CLASS;
2652         } else {
2653                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: no PATH or LID\n", 
2654                        peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2655                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2656                 return;
2657         }
2658
2659         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2660         if (rc != 0) {
2661                 kibnal_connreq_done(conn, 1, rc);
2662         }
2663
2664         /* do the actual connection request */
2665         kibnal_connect_conn(conn);
2666 }
2667
2668 void
2669 kibnal_arp_callback (ibat_stat_t arprc, ibat_arp_data_t *arp_data, void *arg)
2670 {
2671         /* CAVEAT EMPTOR: tasklet context */
2672         kib_conn_t      *conn = (kib_conn_t *)arg;
2673         kib_peer_t      *peer = conn->ibc_peer;
2674         unsigned long    flags;
2675
2676         CDEBUG(D_NET, "Arp "LPX64"@%u.%u.%u.%u rc %d LID %s PATH %s\n",
2677                peer->ibp_nid, HIPQUAD(peer->ibp_ip), arprc,
2678                (arp_data->mask & IBAT_LID_VALID) == 0 ? "invalid" : "valid",
2679                (arp_data->mask & IBAT_PRI_PATH_VALID) == 0 ? "invalid" : "valid");
2680         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2681
2682         conn->ibc_connvars->cv_arprc = arprc;
2683         if (arprc == ibat_stat_ok)
2684                 conn->ibc_connvars->cv_arp = *arp_data;
2685         
2686         /* connd takes over my ref on conn */
2687         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2688         
2689         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2690         wake_up(&kibnal_data.kib_connd_waitq);
2691         
2692         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2693 }
2694
2695 void
2696 kibnal_arp_peer (kib_peer_t *peer)
2697 {
2698         cm_cep_handle_t  cep;
2699         kib_conn_t      *conn;
2700         int              ibatrc;
2701
2702         /* Only the connd does this (i.e. single threaded) */
2703         LASSERT (current == kibnal_data.kib_connd);
2704         LASSERT (peer->ibp_connecting != 0);
2705
2706         cep = cm_create_cep(cm_cep_transp_rc);
2707         if (cep == NULL) {
2708                 CERROR ("Can't create cep for conn->"LPX64"\n",
2709                         peer->ibp_nid);
2710                 kibnal_peer_connect_failed(peer, 1);
2711                 return;
2712         }
2713
2714         conn = kibnal_create_conn(cep);
2715         if (conn == NULL) {
2716                 CERROR ("Can't allocate conn->"LPX64"\n",
2717                         peer->ibp_nid);
2718                 cm_destroy_cep(cep);
2719                 kibnal_peer_connect_failed(peer, 1);
2720                 return;
2721         }
2722
2723         conn->ibc_peer = peer;
2724         kibnal_peer_addref(peer);
2725
2726         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2727
2728         ibatrc = ibat_get_ib_data(htonl(peer->ibp_ip), INADDR_ANY, 
2729                                   ibat_paths_primary,
2730                                   &conn->ibc_connvars->cv_arp, 
2731                                   kibnal_arp_callback, conn, 0);
2732         CDEBUG(D_NET,"ibatrc %d\n", ibatrc);
2733         switch (ibatrc) {
2734         default:
2735                 LBUG();
2736                 
2737         case ibat_stat_pending:
2738                 /* NB callback has my ref on conn */
2739                 break;
2740                 
2741         case ibat_stat_ok:
2742                 /* Immediate return (ARP cache hit) == no callback. */
2743                 kibnal_send_connreq(conn);
2744                 kibnal_conn_decref(conn);
2745                 break;
2746
2747         case ibat_stat_error:
2748         case ibat_stat_timeout:
2749         case ibat_stat_not_found:
2750                 CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n", peer->ibp_nid,
2751                        HIPQUAD(peer->ibp_ip), ibatrc);
2752                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2753                 kibnal_conn_decref(conn);
2754                 break;
2755         }
2756 }
2757
2758 int
2759 kibnal_conn_timed_out (kib_conn_t *conn)
2760 {
2761         kib_tx_t          *tx;
2762         struct list_head  *ttmp;
2763
2764         spin_lock(&conn->ibc_lock);
2765
2766         list_for_each (ttmp, &conn->ibc_tx_queue) {
2767                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2768
2769                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2770                         spin_unlock(&conn->ibc_lock);
2771                         return 1;
2772                 }
2773         }
2774
2775         list_for_each (ttmp, &conn->ibc_active_txs) {
2776                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2777
2778                 LASSERT (tx->tx_waiting ||
2779                          tx->tx_sending != 0);
2780
2781                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2782                         spin_unlock(&conn->ibc_lock);
2783                         return 1;
2784                 }
2785         }
2786
2787         spin_unlock(&conn->ibc_lock);
2788         return 0;
2789 }
2790
2791 void
2792 kibnal_check_conns (int idx)
2793 {
2794         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2795         struct list_head  *ptmp;
2796         kib_peer_t        *peer;
2797         kib_conn_t        *conn;
2798         struct list_head  *ctmp;
2799         unsigned long      flags;
2800
2801  again:
2802         /* NB. We expect to have a look at all the peers and not find any
2803          * rdmas to time out, so we just use a shared lock while we
2804          * take a look... */
2805         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2806
2807         list_for_each (ptmp, peers) {
2808                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2809
2810                 list_for_each (ctmp, &peer->ibp_conns) {
2811                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2812
2813                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2814
2815                         /* In case we have enough credits to return via a
2816                          * NOOP, but there were no non-blocking tx descs
2817                          * free to do it last time... */
2818                         kibnal_check_sends(conn);
2819
2820                         if (!kibnal_conn_timed_out(conn))
2821                                 continue;
2822
2823                         /* Handle timeout by closing the whole connection.  We
2824                          * can only be sure RDMA activity has ceased once the
2825                          * QP has been modified. */
2826                         
2827                         kibnal_conn_addref(conn); /* 1 ref for me... */
2828
2829                         read_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2830                                                flags);
2831
2832                         CERROR("Timed out RDMA with "LPX64"\n",
2833                                peer->ibp_nid);
2834
2835                         kibnal_close_conn (conn, -ETIMEDOUT);
2836                         kibnal_conn_decref(conn); /* ...until here */
2837
2838                         /* start again now I've dropped the lock */
2839                         goto again;
2840                 }
2841         }
2842
2843         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2844 }
2845
2846 void
2847 kibnal_disconnect_conn (kib_conn_t *conn)
2848 {
2849         static cm_drequest_data_t dreq;         /* just for the space */
2850         
2851         cm_return_t    cmrc;
2852         unsigned long  flags;
2853
2854         LASSERT (!in_interrupt());
2855         LASSERT (current == kibnal_data.kib_connd);
2856         
2857         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2858
2859         if (conn->ibc_disconnect) {
2860                 /* Had the CM callback already */
2861                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
2862                                         flags);
2863                 kibnal_conn_disconnected(conn);
2864                 return;
2865         }
2866                 
2867         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2868
2869         /* active disconnect */
2870         cmrc = cm_disconnect(conn->ibc_cep, &dreq, NULL);
2871         if (cmrc == cm_stat_success) {
2872                 /* waiting for CM */
2873                 conn->ibc_state = IBNAL_CONN_DISCONNECT2;
2874                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2875                 return;
2876         }
2877
2878         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2879
2880         cm_cancel(conn->ibc_cep);
2881         kibnal_pause(HZ/10);
2882
2883         if (!conn->ibc_disconnect)              /* CM callback will never happen now */
2884                 kibnal_conn_decref(conn);
2885         
2886         LASSERT (atomic_read(&conn->ibc_refcount) > 0);
2887         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2888
2889         kibnal_conn_disconnected(conn);
2890 }
2891
2892 int
2893 kibnal_connd (void *arg)
2894 {
2895         wait_queue_t       wait;
2896         unsigned long      flags;
2897         kib_pcreq_t       *pcr;
2898         kib_conn_t        *conn;
2899         kib_peer_t        *peer;
2900         int                timeout;
2901         int                i;
2902         int                dropped_lock;
2903         int                peer_index = 0;
2904         unsigned long      deadline = jiffies;
2905         
2906         kportal_daemonize ("kibnal_connd");
2907         kportal_blockallsigs ();
2908
2909         init_waitqueue_entry (&wait, current);
2910         kibnal_data.kib_connd = current;
2911
2912         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2913
2914         while (!kibnal_data.kib_shutdown) {
2915
2916                 dropped_lock = 0;
2917
2918                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
2919                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
2920                                            kib_conn_t, ibc_list);
2921                         list_del (&conn->ibc_list);
2922                         
2923                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2924                         dropped_lock = 1;
2925
2926                         kibnal_destroy_conn(conn);
2927
2928                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2929                 }
2930
2931                 if (!list_empty (&kibnal_data.kib_connd_pcreqs)) {
2932                         pcr = list_entry(kibnal_data.kib_connd_pcreqs.next,
2933                                          kib_pcreq_t, pcr_list);
2934                         list_del(&pcr->pcr_list);
2935                         
2936                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2937                         dropped_lock = 1;
2938
2939                         kibnal_recv_connreq(pcr->pcr_cep, &pcr->pcr_cmreq);
2940                         PORTAL_FREE(pcr, sizeof(*pcr));
2941
2942                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2943                 }
2944                         
2945                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2946                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2947                                            kib_peer_t, ibp_connd_list);
2948                         
2949                         list_del_init (&peer->ibp_connd_list);
2950                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2951                         dropped_lock = 1;
2952
2953                         kibnal_arp_peer (peer);
2954                         kibnal_peer_decref (peer);
2955
2956                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2957                 }
2958
2959                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
2960                         conn = list_entry (kibnal_data.kib_connd_conns.next,
2961                                            kib_conn_t, ibc_list);
2962                         list_del (&conn->ibc_list);
2963                         
2964                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2965                         dropped_lock = 1;
2966
2967                         switch (conn->ibc_state) {
2968                         default:
2969                                 LBUG();
2970                                 
2971                         case IBNAL_CONN_ACTIVE_ARP:
2972                                 kibnal_send_connreq(conn);
2973                                 break;
2974
2975                         case IBNAL_CONN_ACTIVE_CONNECT:
2976                                 kibnal_check_connreply(conn);
2977                                 break;
2978
2979                         case IBNAL_CONN_PASSIVE_WAIT:
2980                                 kibnal_check_passive_wait(conn);
2981                                 break;
2982
2983                         case IBNAL_CONN_DISCONNECT1:
2984                         case IBNAL_CONN_DISCONNECT2:
2985                                 kibnal_disconnect_conn(conn);
2986                                 break;
2987                         }
2988                         kibnal_conn_decref(conn);
2989
2990                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2991                 }
2992
2993                 /* careful with the jiffy wrap... */
2994                 timeout = (int)(deadline - jiffies);
2995                 if (timeout <= 0) {
2996                         const int n = 4;
2997                         const int p = 1;
2998                         int       chunk = kibnal_data.kib_peer_hash_size;
2999                         
3000                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3001                         dropped_lock = 1;
3002
3003                         /* Time to check for RDMA timeouts on a few more
3004                          * peers: I do checks every 'p' seconds on a
3005                          * proportion of the peer table and I need to check
3006                          * every connection 'n' times within a timeout
3007                          * interval, to ensure I detect a timeout on any
3008                          * connection within (n+1)/n times the timeout
3009                          * interval. */
3010
3011                         if (kibnal_tunables.kib_io_timeout > n * p)
3012                                 chunk = (chunk * n * p) / 
3013                                         kibnal_tunables.kib_io_timeout;
3014                         if (chunk == 0)
3015                                 chunk = 1;
3016
3017                         for (i = 0; i < chunk; i++) {
3018                                 kibnal_check_conns (peer_index);
3019                                 peer_index = (peer_index + 1) % 
3020                                              kibnal_data.kib_peer_hash_size;
3021                         }
3022
3023                         deadline += p * HZ;
3024                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3025                 }
3026
3027                 if (dropped_lock)
3028                         continue;
3029                 
3030                 /* Nothing to do for 'timeout'  */
3031                 set_current_state (TASK_INTERRUPTIBLE);
3032                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3033                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3034
3035                 schedule_timeout (timeout);
3036
3037                 set_current_state (TASK_RUNNING);
3038                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3039                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3040         }
3041
3042         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3043
3044         kibnal_thread_fini ();
3045         return (0);
3046 }
3047
3048 void 
3049 kibnal_async_callback(vv_event_record_t ev)
3050 {
3051         CERROR("type: %d, port: %d, data: "LPX64"\n", 
3052                ev.event_type, ev.port_num, ev.type.data);
3053 }
3054
3055 void
3056 kibnal_cq_callback (unsigned long unused_context)
3057 {
3058         unsigned long    flags;
3059
3060         CDEBUG(D_NET, "!!\n");
3061
3062         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3063         kibnal_data.kib_ready = 1;
3064         wake_up(&kibnal_data.kib_sched_waitq);
3065         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3066 }
3067
3068 int
3069 kibnal_scheduler(void *arg)
3070 {
3071         long            id = (long)arg;
3072         wait_queue_t    wait;
3073         char            name[16];
3074         vv_wc_t         wc;
3075         vv_return_t     vvrc;
3076         vv_return_t     vvrc2;
3077         unsigned long   flags;
3078         int             busy_loops = 0;
3079
3080         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3081         kportal_daemonize(name);
3082         kportal_blockallsigs();
3083
3084         init_waitqueue_entry(&wait, current);
3085
3086         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3087
3088         while (!kibnal_data.kib_shutdown) {
3089                 if (busy_loops++ >= IBNAL_RESCHED) {
3090                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3091                                                flags);
3092
3093                         our_cond_resched();
3094                         busy_loops = 0;
3095                         
3096                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3097                 }
3098
3099                 if (kibnal_data.kib_ready &&
3100                     !kibnal_data.kib_checking_cq) {
3101                         /* take ownership of completion polling */
3102                         kibnal_data.kib_checking_cq = 1;
3103                         /* Assume I'll exhaust the CQ */
3104                         kibnal_data.kib_ready = 0;
3105                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, 
3106                                                flags);
3107                         
3108                         vvrc = vv_poll_for_completion(kibnal_data.kib_hca, 
3109                                                       kibnal_data.kib_cq, &wc);
3110                         if (vvrc == vv_return_err_cq_empty) {
3111                                 vvrc2 = vv_request_completion_notification(
3112                                         kibnal_data.kib_hca, 
3113                                         kibnal_data.kib_cq, 
3114                                         vv_next_solicit_unsolicit_event);
3115                                 LASSERT (vvrc2 == vv_return_ok);
3116                         }
3117                         
3118                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3119                         /* give up ownership of completion polling */
3120                         kibnal_data.kib_checking_cq = 0;
3121
3122                         if (vvrc == vv_return_err_cq_empty)
3123                                 continue;
3124
3125                         LASSERT (vvrc == vv_return_ok);
3126                         /* Assume there's more: get another scheduler to check
3127                          * while I handle this completion... */
3128
3129                         kibnal_data.kib_ready = 1;
3130                         wake_up(&kibnal_data.kib_sched_waitq);
3131
3132                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3133                                                flags);
3134
3135                         switch (kibnal_wreqid2type(wc.wr_id)) {
3136                         case IBNAL_WID_RX:
3137                                 kibnal_rx_complete(
3138                                         (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id),
3139                                         wc.completion_status,
3140                                         wc.num_bytes_transfered);
3141                                 break;
3142
3143                         case IBNAL_WID_TX:
3144                                 kibnal_tx_complete(
3145                                         (kib_tx_t *)kibnal_wreqid2ptr(wc.wr_id),
3146                                         wc.completion_status);
3147                                 break;
3148
3149                         case IBNAL_WID_RDMA:
3150                                 /* We only get RDMA completion notification if
3151                                  * it fails.  So we just ignore them completely
3152                                  * because...
3153                                  *
3154                                  * 1) If an RDMA fails, all subsequent work
3155                                  * items, including the final SEND will fail
3156                                  * too, so I'm still guaranteed to notice that
3157                                  * this connection is hosed.
3158                                  *
3159                                  * 2) It's positively dangerous to look inside
3160                                  * the tx descriptor obtained from an RDMA work
3161                                  * item.  As soon as I drop the kib_sched_lock,
3162                                  * I give a scheduler on another CPU a chance
3163                                  * to get the final SEND completion, so the tx
3164                                  * descriptor can get freed as I inspect it. */
3165                                 CERROR ("RDMA failed: %d\n", 
3166                                         wc.completion_status);
3167                                 break;
3168
3169                         default:
3170                                 LBUG();
3171                         }
3172                         
3173                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3174                         continue;
3175                 }
3176
3177                 /* Nothing to do; sleep... */
3178
3179                 set_current_state(TASK_INTERRUPTIBLE);
3180                 add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3181                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3182                                        flags);
3183
3184                 schedule();
3185
3186                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3187                 set_current_state(TASK_RUNNING);
3188                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3189         }
3190
3191         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3192
3193         kibnal_thread_fini();
3194         return (0);
3195 }
3196
3197
3198 lib_nal_t kibnal_lib = {
3199         .libnal_data = &kibnal_data,      /* NAL private data */
3200         .libnal_send = kibnal_send,
3201         .libnal_send_pages = kibnal_send_pages,
3202         .libnal_recv = kibnal_recv,
3203         .libnal_recv_pages = kibnal_recv_pages,
3204         .libnal_dist = kibnal_dist
3205 };