Whamcloud - gitweb
3bb4a1dbf0936b184959076afe514c60c960791a
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 void
28 kibnal_tx_done (kib_tx_t *tx)
29 {
30         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
31         int              i;
32
33         LASSERT (!in_interrupt());
34         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
35         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
36
37 #if !IBNAL_WHOLE_MEM
38         switch (tx->tx_mapped) {
39         default:
40                 LBUG();
41
42         case KIB_TX_UNMAPPED:
43                 break;
44
45         case KIB_TX_MAPPED: {
46                 vv_return_t      vvrc;
47
48                 vvrc = vv_mem_region_destroy(kibnal_data.kib_hca,
49                                              tx->tx_md.md_handle);
50                 LASSERT (vvrc == vv_return_ok);
51                 tx->tx_mapped = KIB_TX_UNMAPPED;
52                 break;
53         }
54         }
55 #endif
56         for (i = 0; i < 2; i++) {
57                 /* tx may have up to 2 libmsgs to finalise */
58                 if (tx->tx_libmsg[i] == NULL)
59                         continue;
60
61                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
62                 tx->tx_libmsg[i] = NULL;
63         }
64         
65         if (tx->tx_conn != NULL) {
66                 kibnal_conn_decref(tx->tx_conn);
67                 tx->tx_conn = NULL;
68         }
69
70         tx->tx_nwrq = 0;
71         tx->tx_status = 0;
72
73         spin_lock(&kibnal_data.kib_tx_lock);
74
75         if (tx->tx_isnblk) {
76                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
77         } else {
78                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
79                 wake_up (&kibnal_data.kib_idle_tx_waitq);
80         }
81
82         spin_unlock(&kibnal_data.kib_tx_lock);
83 }
84
85 kib_tx_t *
86 kibnal_get_idle_tx (int may_block) 
87 {
88         kib_tx_t      *tx = NULL;
89         ENTRY;
90         
91         for (;;) {
92                 spin_lock(&kibnal_data.kib_tx_lock);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
96                         tx = list_entry (kibnal_data.kib_idle_txs.next,
97                                          kib_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
104                                 CERROR ("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
109                                          kib_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock(&kibnal_data.kib_tx_lock);
115
116                 wait_event (kibnal_data.kib_idle_tx_waitq,
117                             !list_empty (&kibnal_data.kib_idle_txs) ||
118                             kibnal_data.kib_shutdown);
119         }
120
121         if (tx != NULL) {
122                 list_del (&tx->tx_list);
123
124                 /* Allocate a new completion cookie.  It might not be needed,
125                  * but we've got a lock right now and we're unlikely to
126                  * wrap... */
127                 tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
128 #if IBNAL_WHOLE_MEM
129                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
130 #endif
131                 LASSERT (tx->tx_nwrq == 0);
132                 LASSERT (tx->tx_sending == 0);
133                 LASSERT (!tx->tx_waiting);
134                 LASSERT (tx->tx_status == 0);
135                 LASSERT (tx->tx_conn == NULL);
136                 LASSERT (tx->tx_libmsg[0] == NULL);
137                 LASSERT (tx->tx_libmsg[1] == NULL);
138         }
139
140         spin_unlock(&kibnal_data.kib_tx_lock);
141         
142         RETURN(tx);
143 }
144
145 int
146 kibnal_post_rx (kib_rx_t *rx, int credit)
147 {
148         kib_conn_t   *conn = rx->rx_conn;
149         int           rc = 0;
150         vv_return_t   vvrc;
151
152         LASSERT (!in_interrupt());
153         
154         rx->rx_gl = (vv_scatgat_t) {
155                 .v_address = (void *)((unsigned long)KIBNAL_RX_VADDR(rx)),
156                 .l_key     = KIBNAL_RX_LKEY(rx),
157                 .length    = IBNAL_MSG_SIZE,
158         };
159
160         rx->rx_wrq = (vv_wr_t) {
161                 .wr_id                   = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
162                 .completion_notification = 1,
163                 .scatgat_list            = &rx->rx_gl,
164                 .num_of_data_segments    = 1,
165                 .wr_type                 = vv_wr_receive,
166         };
167
168         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
169         LASSERT (!rx->rx_posted);
170
171         CDEBUG(D_NET, "posting rx [%d %x %p]\n", 
172                rx->rx_wrq.scatgat_list->length,
173                rx->rx_wrq.scatgat_list->l_key,
174                rx->rx_wrq.scatgat_list->v_address);
175
176         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
177                 /* No more posts for this rx; so lose its ref */
178                 kibnal_conn_decref(conn);
179                 return 0;
180         }
181         
182         rx->rx_posted = 1;
183
184         spin_lock(&conn->ibc_lock);
185         /* Serialise vv_post_receive; it's not re-entrant on the same QP */
186         vvrc = vv_post_receive(kibnal_data.kib_hca,
187                                conn->ibc_qp, &rx->rx_wrq);
188         spin_unlock(&conn->ibc_lock);
189
190         if (vvrc == 0) {
191                 if (credit) {
192                         spin_lock(&conn->ibc_lock);
193                         conn->ibc_outstanding_credits++;
194                         spin_unlock(&conn->ibc_lock);
195
196                         kibnal_check_sends(conn);
197                 }
198                 return 0;
199         }
200         
201         CERROR ("post rx -> "LPX64" failed %d\n", 
202                 conn->ibc_peer->ibp_nid, vvrc);
203         rc = -EIO;
204         kibnal_close_conn(rx->rx_conn, rc);
205         /* No more posts for this rx; so lose its ref */
206         kibnal_conn_decref(conn);
207         return rc;
208 }
209
210 int
211 kibnal_post_receives (kib_conn_t *conn)
212 {
213         int    i;
214         int    rc;
215
216         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
217         LASSERT (conn->ibc_comms_error == 0);
218
219         for (i = 0; i < IBNAL_RX_MSGS; i++) {
220                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
221                  * fails (i.e. actual failure or we're disconnecting) */
222                 kibnal_conn_addref(conn);
223                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0);
224                 if (rc != 0)
225                         return rc;
226         }
227
228         return 0;
229 }
230
231 kib_tx_t *
232 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
233 {
234         struct list_head   *tmp;
235         
236         list_for_each(tmp, &conn->ibc_active_txs) {
237                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
238                 
239                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
240
241                 if (tx->tx_cookie != cookie)
242                         continue;
243
244                 if (tx->tx_waiting &&
245                     tx->tx_msg->ibm_type == txtype)
246                         return tx;
247
248                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
249                       tx->tx_waiting ? "" : "NOT ",
250                       tx->tx_msg->ibm_type, txtype);
251         }
252         return NULL;
253 }
254
255 void
256 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
257 {
258         kib_tx_t    *tx;
259         int          idle;
260
261         spin_lock(&conn->ibc_lock);
262
263         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
264         if (tx == NULL) {
265                 spin_unlock(&conn->ibc_lock);
266
267                 CWARN("Unmatched completion type %x cookie "LPX64
268                       " from "LPX64"\n",
269                       txtype, cookie, conn->ibc_peer->ibp_nid);
270                 kibnal_close_conn (conn, -EPROTO);
271                 return;
272         }
273
274         if (tx->tx_status == 0) {               /* success so far */
275                 if (status < 0) {               /* failed? */
276                         tx->tx_status = status;
277                 } else if (txtype == IBNAL_MSG_GET_REQ) { 
278                         /* XXX layering violation: set REPLY data length */
279                         LASSERT (tx->tx_libmsg[1] != NULL);
280                         LASSERT (tx->tx_libmsg[1]->ev.type == 
281                                  PTL_EVENT_REPLY_END);
282
283                         tx->tx_libmsg[1]->ev.mlength = status;
284                 }
285         }
286         
287         tx->tx_waiting = 0;
288
289         idle = tx->tx_sending == 0;
290         if (idle)
291                 list_del(&tx->tx_list);
292
293         spin_unlock(&conn->ibc_lock);
294         
295         if (idle)
296                 kibnal_tx_done(tx);
297 }
298
299 void
300 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
301 {
302         kib_tx_t    *tx = kibnal_get_idle_tx(0);
303         
304         if (tx == NULL) {
305                 CERROR("Can't get tx for completion %x for "LPX64"\n",
306                        type, conn->ibc_peer->ibp_nid);
307                 return;
308         }
309         
310         tx->tx_msg->ibm_u.completion.ibcm_status = status;
311         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
312         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
313         
314         kibnal_queue_tx(tx, conn);
315 }
316
317 void
318 kibnal_handle_rx (kib_rx_t *rx)
319 {
320         kib_msg_t    *msg = rx->rx_msg;
321         kib_conn_t   *conn = rx->rx_conn;
322         int           credits = msg->ibm_credits;
323         kib_tx_t     *tx;
324         int           rc;
325
326         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
327
328         CDEBUG (D_NET, "Received %x[%d] from "LPX64"\n",
329                 msg->ibm_type, credits, conn->ibc_peer->ibp_nid);
330         
331         if (credits != 0) {
332                 /* Have I received credits that will let me send? */
333                 spin_lock(&conn->ibc_lock);
334                 conn->ibc_credits += credits;
335                 spin_unlock(&conn->ibc_lock);
336
337                 kibnal_check_sends(conn);
338         }
339
340         switch (msg->ibm_type) {
341         default:
342                 CERROR("Bad IBNAL message type %x from "LPX64"\n",
343                        msg->ibm_type, conn->ibc_peer->ibp_nid);
344                 break;
345
346         case IBNAL_MSG_NOOP:
347                 break;
348
349         case IBNAL_MSG_IMMEDIATE:
350                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
351                 break;
352                 
353         case IBNAL_MSG_PUT_REQ:
354                 rx->rx_responded = 0;
355                 lib_parse(&kibnal_lib, &msg->ibm_u.putreq.ibprm_hdr, rx);
356                 if (rx->rx_responded)
357                         break;
358
359                 /* I wasn't asked to transfer any payload data.  This happens
360                  * if the PUT didn't match, or got truncated. */
361                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
362                                        msg->ibm_u.putreq.ibprm_cookie);
363                 break;
364
365         case IBNAL_MSG_PUT_NAK:
366                 CWARN ("PUT_NACK from "LPX64"\n", conn->ibc_peer->ibp_nid);
367                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
368                                          msg->ibm_u.completion.ibcm_status,
369                                          msg->ibm_u.completion.ibcm_cookie);
370                 break;
371
372         case IBNAL_MSG_PUT_ACK:
373                 spin_lock(&conn->ibc_lock);
374                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
375                                                    msg->ibm_u.putack.ibpam_src_cookie);
376                 if (tx != NULL)
377                         list_del(&tx->tx_list);
378                 spin_unlock(&conn->ibc_lock);
379
380                 if (tx == NULL) {
381                         CERROR("Unmatched PUT_ACK from "LPX64"\n",
382                                conn->ibc_peer->ibp_nid);
383                         kibnal_close_conn(conn, -EPROTO);
384                         break;
385                 }
386
387                 LASSERT (tx->tx_waiting);
388                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
389                  * (a) I can overwrite tx_msg since my peer has received it!
390                  * (b) while tx_waiting is set, tx_complete() won't touch it.
391                  */
392
393                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
394
395                 rc = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
396                                       kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
397                                       &msg->ibm_u.putack.ibpam_rd,
398                                       msg->ibm_u.putack.ibpam_dst_cookie);
399                 if (rc < 0)
400                         CERROR("Can't setup rdma for PUT to "LPX64": %d\n",
401                                conn->ibc_peer->ibp_nid, rc);
402
403                 spin_lock(&conn->ibc_lock);
404                 if (tx->tx_status == 0 && rc < 0)
405                         tx->tx_status = rc;
406                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
407                 kibnal_queue_tx_locked(tx, conn);
408                 spin_unlock(&conn->ibc_lock);
409                 break;
410                 
411         case IBNAL_MSG_PUT_DONE:
412                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
413                                          msg->ibm_u.completion.ibcm_status,
414                                          msg->ibm_u.completion.ibcm_cookie);
415                 break;
416
417         case IBNAL_MSG_GET_REQ:
418                 rx->rx_responded = 0;
419                 lib_parse(&kibnal_lib, &msg->ibm_u.get.ibgm_hdr, rx);
420                 if (rx->rx_responded)           /* I responded to the GET_REQ */
421                         break;
422                 /* NB GET didn't match (I'd have responded even with no payload
423                  * data) */
424                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, -ENODATA,
425                                        msg->ibm_u.get.ibgm_cookie);
426                 break;
427
428         case IBNAL_MSG_GET_DONE:
429                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
430                                          msg->ibm_u.completion.ibcm_status,
431                                          msg->ibm_u.completion.ibcm_cookie);
432                 break;
433         }
434
435         kibnal_post_rx(rx, 1);
436 }
437
438 void
439 kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob)
440 {
441         kib_msg_t    *msg = rx->rx_msg;
442         kib_conn_t   *conn = rx->rx_conn;
443         unsigned long flags;
444         int           rc;
445
446         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
447         LASSERT (rx->rx_posted);
448         rx->rx_posted = 0;
449
450         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
451                 goto ignore;
452
453         if (vvrc != vv_comp_status_success) {
454                 CERROR("Rx from "LPX64" failed: %d\n", 
455                        conn->ibc_peer->ibp_nid, vvrc);
456                 goto failed;
457         }
458
459         rc = kibnal_unpack_msg(msg, nob);
460         if (rc != 0) {
461                 CERROR ("Error %d unpacking rx from "LPX64"\n",
462                         rc, conn->ibc_peer->ibp_nid);
463                 goto failed;
464         }
465
466         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
467             msg->ibm_srcstamp != conn->ibc_incarnation ||
468             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
469             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
470                 CERROR ("Stale rx from "LPX64"\n",
471                         conn->ibc_peer->ibp_nid);
472                 goto failed;
473         }
474
475         /* racing with connection establishment/teardown! */
476
477         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
478                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
479                 /* must check holding global lock to eliminate race */
480                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
481                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
482                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
483                                                 flags);
484                         return;
485                 }
486                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
487                                         flags);
488         }
489         kibnal_handle_rx(rx);
490         return;
491         
492  failed:
493         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
494         kibnal_close_conn(conn, -EIO);
495  ignore:
496         /* Don't re-post rx & drop its ref on conn */
497         kibnal_conn_decref(conn);
498 }
499
500 #if IBNAL_WHOLE_MEM
501 int
502 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
503                      unsigned long page_offset, unsigned long len)
504 {
505         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
506         vv_l_key_t       l_key;
507         vv_r_key_t       r_key;
508         __u64            addr;
509         __u64            frag_addr;
510         void            *ptr;
511         vv_mem_reg_h_t   mem_h;
512         vv_return_t      vvrc;
513
514         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
515                 CERROR ("Too many RDMA fragments\n");
516                 return -EMSGSIZE;
517         }
518
519 #if CONFIG_HIGHMEM
520 # error "This probably doesn't work because of over/underflow when casting between __u64 and void *..."
521 #endif
522         /* Try to create an address that adapter-tavor will munge into a valid
523          * network address, given how it maps all phys mem into 1 region */
524         addr = page_to_phys(page) + page_offset + PAGE_OFFSET;
525
526         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca, 
527                                     (void *)((unsigned long)addr),
528                                     len, &mem_h, &l_key, &r_key);
529         LASSERT (vvrc == vv_return_ok);
530
531         if (active) {
532                 if (rd->rd_nfrag == 0) {
533                         rd->rd_key = l_key;
534                 } else if (l_key != rd->rd_key) {
535                         CERROR ("> 1 key for single RDMA desc\n");
536                         return -EINVAL;
537                 }
538                 frag_addr = addr;
539         } else {
540                 if (rd->rd_nfrag == 0) {
541                         rd->rd_key = r_key;
542                 } else if (r_key != rd->rd_key) {
543                         CERROR ("> 1 key for single RDMA desc\n");
544                         return -EINVAL;
545                 }
546                 vv_va2advertise_addr(kibnal_data.kib_hca, 
547                                      (void *)((unsigned long)addr), &ptr);
548                 frag_addr = (unsigned long)ptr;
549         }
550
551         kibnal_rf_set(frag, frag_addr, len);
552
553         CDEBUG(D_NET,"map frag [%d][%d %x %08x%08x] "LPX64"\n", 
554                rd->rd_nfrag, frag->rf_nob, rd->rd_key, 
555                frag->rf_addr_hi, frag->rf_addr_lo, frag_addr);
556
557         rd->rd_nfrag++;
558         return 0;
559 }
560
561 struct page *
562 kibnal_kvaddr_to_page (unsigned long vaddr)
563 {
564         struct page *page;
565
566         if (vaddr >= VMALLOC_START &&
567             vaddr < VMALLOC_END)
568                 page = vmalloc_to_page ((void *)vaddr);
569 #if CONFIG_HIGHMEM
570         else if (vaddr >= PKMAP_BASE &&
571                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
572                 page = vmalloc_to_page ((void *)vaddr);
573         /* in 2.4 ^ just walks the page tables */
574 #endif
575         else
576                 page = virt_to_page (vaddr);
577
578         return VALID_PAGE(page) ? page : NULL;
579 }
580
581 int
582 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, 
583                     vv_access_con_bit_mask_t access,
584                     int niov, struct iovec *iov, int offset, int nob)
585                  
586 {
587         /* active if I'm sending */
588         int           active = ((access & vv_acc_r_mem_write) == 0);
589         int           fragnob;
590         int           rc;
591         unsigned long vaddr;
592         struct page  *page;
593         int           page_offset;
594
595         LASSERT (nob > 0);
596         LASSERT (niov > 0);
597         LASSERT ((rd != tx->tx_rd) == !active);
598
599         while (offset >= iov->iov_len) {
600                 offset -= iov->iov_len;
601                 niov--;
602                 iov++;
603                 LASSERT (niov > 0);
604         }
605
606         rd->rd_nfrag = 0;
607         do {
608                 LASSERT (niov > 0);
609
610                 vaddr = ((unsigned long)iov->iov_base) + offset;
611                 page_offset = vaddr & (PAGE_SIZE - 1);
612                 page = kibnal_kvaddr_to_page(vaddr);
613                 if (page == NULL) {
614                         CERROR ("Can't find page\n");
615                         return -EFAULT;
616                 }
617
618                 fragnob = min((int)(iov->iov_len - offset), nob);
619                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
620
621                 rc = kibnal_append_rdfrag(rd, active, page, 
622                                           page_offset, fragnob);
623                 if (rc != 0)
624                         return rc;
625
626                 if (offset + fragnob < iov->iov_len) {
627                         offset += fragnob;
628                 } else {
629                         offset = 0;
630                         iov++;
631                         niov--;
632                 }
633                 nob -= fragnob;
634         } while (nob > 0);
635         
636         return 0;
637 }
638
639 int
640 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, 
641                       vv_access_con_bit_mask_t access,
642                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
643 {
644         /* active if I'm sending */
645         int            active = ((access & vv_acc_r_mem_write) == 0);
646         int            fragnob;
647         int            rc;
648
649         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
650
651         LASSERT (nob > 0);
652         LASSERT (nkiov > 0);
653         LASSERT ((rd != tx->tx_rd) == !active);
654
655         while (offset >= kiov->kiov_len) {
656                 offset -= kiov->kiov_len;
657                 nkiov--;
658                 kiov++;
659                 LASSERT (nkiov > 0);
660         }
661
662         rd->rd_nfrag = 0;
663         do {
664                 LASSERT (nkiov > 0);
665                 fragnob = min((int)(kiov->kiov_len - offset), nob);
666                 
667                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
668                                           kiov->kiov_offset + offset,
669                                           fragnob);
670                 if (rc != 0)
671                         return rc;
672
673                 offset = 0;
674                 kiov++;
675                 nkiov--;
676                 nob -= fragnob;
677         } while (nob > 0);
678
679         return 0;
680 }
681 #else
682 int
683 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
684                      vv_access_con_bit_mask_t access,
685                      int niov, struct iovec *iov, int offset, int nob)
686                  
687 {
688         /* active if I'm sending */
689         int         active = ((access & vv_acc_r_mem_write) == 0);
690         void       *vaddr;
691         vv_return_t vvrc;
692
693         LASSERT (nob > 0);
694         LASSERT (niov > 0);
695         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
696         LASSERT ((rd != tx->tx_rd) == !active);
697
698         while (offset >= iov->iov_len) {
699                 offset -= iov->iov_len;
700                 niov--;
701                 iov++;
702                 LASSERT (niov > 0);
703         }
704
705         if (nob > iov->iov_len - offset) {
706                 CERROR ("Can't map multiple vaddr fragments\n");
707                 return (-EMSGSIZE);
708         }
709
710         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
711         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
712
713         vvrc = vv_mem_region_register(kibnal_data.kib_hca, vaddr, nob,
714                                       kibnal_data.kib_pd, access,
715                                       &tx->tx_md.md_handle, 
716                                       &tx->tx_md.md_lkey,
717                                       &tx->tx_md.md_rkey);
718         if (vvrc != vv_return_ok) {
719                 CERROR ("Can't map vaddr %p: %d\n", vaddr, vvrc);
720                 return -EFAULT;
721         }
722
723         tx->tx_mapped = KIB_TX_MAPPED;
724
725         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
726         rd->rd_nfrag = 1;
727         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
728         
729         return (0);
730 }
731
732 int
733 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
734                       vv_access_con_bit_mask_t access,
735                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
736 {
737         /* active if I'm sending */
738         int            active = ((access & vv_acc_r_mem_write) == 0);
739         vv_return_t    vvrc;
740         vv_phy_list_t  phys_pages;
741         vv_phy_buf_t  *phys;
742         int            page_offset;
743         int            nphys;
744         int            resid;
745         int            phys_size;
746         int            rc;
747
748         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
749
750         LASSERT (nob > 0);
751         LASSERT (nkiov > 0);
752         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
753         LASSERT ((rd != tx->tx_rd) == !active);
754
755         while (offset >= kiov->kiov_len) {
756                 offset -= kiov->kiov_len;
757                 nkiov--;
758                 kiov++;
759                 LASSERT (nkiov > 0);
760         }
761
762         phys_size = nkiov * sizeof (*phys);
763         PORTAL_ALLOC(phys, phys_size);
764         if (phys == NULL) {
765                 CERROR ("Can't allocate tmp phys\n");
766                 return (-ENOMEM);
767         }
768
769         page_offset = kiov->kiov_offset + offset;
770
771         phys[0].start = kibnal_page2phys(kiov->kiov_page);
772         phys[0].size = PAGE_SIZE;
773
774         nphys = 1;
775         resid = nob - (kiov->kiov_len - offset);
776
777         while (resid > 0) {
778                 kiov++;
779                 nkiov--;
780                 LASSERT (nkiov > 0);
781
782                 if (kiov->kiov_offset != 0 ||
783                     ((resid > PAGE_SIZE) && 
784                      kiov->kiov_len < PAGE_SIZE)) {
785                         int i;
786                         /* Can't have gaps */
787                         CERROR ("Can't make payload contiguous in I/O VM:"
788                                 "page %d, offset %d, len %d \n", nphys, 
789                                 kiov->kiov_offset, kiov->kiov_len);
790
791                         for (i = -nphys; i < nkiov; i++)
792                                 CERROR("kiov[%d] %p +%d for %d\n",
793                                        i, kiov[i].kiov_page, 
794                                        kiov[i].kiov_offset, 
795                                        kiov[i].kiov_len);
796                         
797                         rc = -EINVAL;
798                         goto out;
799                 }
800
801                 LASSERT (nphys * sizeof (*phys) < phys_size);
802                 phys[nphys].start = kibnal_page2phys(kiov->kiov_page);
803                 phys[nphys].size = PAGE_SIZE;
804
805                 nphys++;
806                 resid -= PAGE_SIZE;
807         }
808
809 #if 0
810         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
811         for (i = 0; i < nphys; i++)
812                 CWARN ("   [%d] "LPX64"\n", i, phys[i]);
813 #endif
814
815         vvrc = vv_phy_mem_region_register(kibnal_data.kib_hca,
816                                           &phys_pages,
817                                           IBNAL_RDMA_BASE,
818                                           nphys,
819                                           page_offset,
820                                           kibnal_data.kib_pd,
821                                           access,
822                                           &tx->tx_md.md_handle,
823                                           &tx->tx_md.md_addr,
824                                           &tx->tx_md.md_lkey,
825                                           &tx->tx_md.md_rkey);
826
827         if (vvrc != vv_return_ok) {
828                 CERROR ("Can't map phys: %d\n", vvrc);
829                 rc = -EFAULT;
830                 goto out;
831         }
832
833         CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: "
834                "lkey %x, rkey %x, addr "LPX64"\n",
835                nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey,
836                tx->tx_md.md_addr);
837
838         tx->tx_mapped = KIB_TX_MAPPED;
839         rc = 0;
840
841         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
842         rd->rd_nfrag = 1;
843         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
844         
845  out:
846         PORTAL_FREE(phys, phys_size);
847         return (rc);
848 }
849 #endif
850
851 kib_conn_t *
852 kibnal_find_conn_locked (kib_peer_t *peer)
853 {
854         struct list_head *tmp;
855
856         /* just return the first connection */
857         list_for_each (tmp, &peer->ibp_conns) {
858                 return (list_entry(tmp, kib_conn_t, ibc_list));
859         }
860
861         return (NULL);
862 }
863
864 void
865 kibnal_check_sends (kib_conn_t *conn)
866 {
867         kib_tx_t       *tx;
868         vv_return_t     vvrc;                        
869         int             rc;
870         int             i;
871         int             done;
872
873         /* Don't send anything until after the connection is established */
874         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
875                 CDEBUG(D_NET, LPX64"too soon\n", conn->ibc_peer->ibp_nid);
876                 return;
877         }
878         
879         spin_lock(&conn->ibc_lock);
880
881         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
882
883         if (list_empty(&conn->ibc_tx_queue) &&
884             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
885                 spin_unlock(&conn->ibc_lock);
886                 
887                 tx = kibnal_get_idle_tx(0);     /* don't block */
888                 if (tx != NULL)
889                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
890
891                 spin_lock(&conn->ibc_lock);
892                 
893                 if (tx != NULL)
894                         kibnal_queue_tx_locked(tx, conn);
895         }
896
897         while (!list_empty (&conn->ibc_tx_queue)) {
898                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
899
900                 /* We rely on this for QP sizing */
901                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
902
903                 LASSERT (conn->ibc_outstanding_credits >= 0);
904                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
905                 LASSERT (conn->ibc_credits >= 0);
906                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
907
908                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE) {
909                         CDEBUG(D_NET, LPX64": posted enough\n",
910                                conn->ibc_peer->ibp_nid);
911                         break;
912                 }
913                 
914                 if (conn->ibc_credits == 0) {   /* no credits */
915                         CDEBUG(D_NET, LPX64": no credits\n",
916                                conn->ibc_peer->ibp_nid);
917                         break;
918                 }
919                 
920                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
921                     conn->ibc_outstanding_credits == 0) { /* giving back credits */
922                         CDEBUG(D_NET, LPX64": not using last credit\n",
923                                conn->ibc_peer->ibp_nid);
924                         break;
925                 }
926                 
927                 list_del (&tx->tx_list);
928
929                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
930                     (!list_empty(&conn->ibc_tx_queue) ||
931                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
932                         /* redundant NOOP */
933                         spin_unlock(&conn->ibc_lock);
934                         kibnal_tx_done(tx);
935                         spin_lock(&conn->ibc_lock);
936                         CDEBUG(D_NET, LPX64": redundant noop\n",
937                                conn->ibc_peer->ibp_nid);
938                         continue;
939                 }
940
941                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
942                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
943
944                 conn->ibc_outstanding_credits = 0;
945                 conn->ibc_nsends_posted++;
946                 conn->ibc_credits--;
947
948                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
949                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
950                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
951                  * and then re-queued here.  It's (just) possible that
952                  * tx_sending is non-zero if we've not done the tx_complete() from
953                  * the first send; hence the += rather than = below. */
954                 tx->tx_sending++;
955
956                 list_add (&tx->tx_list, &conn->ibc_active_txs);
957
958                 /* Keep holding ibc_lock while posting sends on this
959                  * connection; vv_post_send() isn't re-entrant on the same
960                  * QP!! */
961
962                 LASSERT (tx->tx_nwrq > 0);
963
964                 rc = -ECONNABORTED;
965                 vvrc = vv_return_ok;
966                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
967                         tx->tx_status = 0;
968 #if 1
969                         vvrc = vv_post_send_list(kibnal_data.kib_hca,
970                                                  conn->ibc_qp,
971                                                  tx->tx_nwrq,
972                                                  tx->tx_wrq,
973                                                  vv_operation_type_send_rc);
974                         rc = (vvrc == vv_return_ok) ? 0 : -EIO;
975 #else
976                         /* Only post 1 item at a time for now (so we know
977                          * exactly how many got posted successfully) */
978                         for (i = 0; i < tx->tx_nwrq; i++) {
979                                 switch (tx->tx_wrq[i].wr_type) {
980                                 case vv_wr_send:
981                                         CDEBUG(D_NET, "[%d]posting send [%d %x %p]%s: %x\n", 
982                                                i,
983                                                tx->tx_wrq[i].scatgat_list->length,
984                                                tx->tx_wrq[i].scatgat_list->l_key,
985                                                tx->tx_wrq[i].scatgat_list->v_address,
986                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.fance_indicator ?
987                                                "(fence)":"",
988                                                tx->tx_msg->ibm_type);
989                                         break;
990                                 case vv_wr_rdma_write:
991                                         CDEBUG(D_NET, "[%d]posting PUT  [%d %x %p]->[%x "LPX64"]\n", 
992                                                i,
993                                                tx->tx_wrq[i].scatgat_list->length,
994                                                tx->tx_wrq[i].scatgat_list->l_key,
995                                                tx->tx_wrq[i].scatgat_list->v_address,
996                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_r_key,
997                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_addr);
998                                         break;
999                                 case vv_wr_rdma_read:
1000                                         CDEBUG(D_NET, "[%d]posting GET  [%d %x %p]->[%x "LPX64"]\n", 
1001                                                i,
1002                                                tx->tx_wrq[i].scatgat_list->length,
1003                                                tx->tx_wrq[i].scatgat_list->l_key,
1004                                                tx->tx_wrq[i].scatgat_list->v_address,
1005                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_r_key,
1006                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_addr);
1007                                         break;
1008                                 default:
1009                                         LBUG();
1010                                 }
1011                                 vvrc = vv_post_send(kibnal_data.kib_hca,
1012                                                     conn->ibc_qp, 
1013                                                     &tx->tx_wrq[i], 
1014                                                     vv_operation_type_send_rc);
1015                                 CDEBUG(D_NET, LPX64": post %d/%d\n",
1016                                        conn->ibc_peer->ibp_nid, i, tx->tx_nwrq);
1017                                 if (vvrc != vv_return_ok) {
1018                                         rc = -EIO;
1019                                         break;
1020                                 }
1021                         }
1022 #endif
1023                 }
1024
1025                 if (rc != 0) {
1026                         /* NB credits are transferred in the actual
1027                          * message, which can only be the last work item */
1028                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
1029                         conn->ibc_credits++;
1030                         conn->ibc_nsends_posted--;
1031
1032                         tx->tx_status = rc;
1033                         tx->tx_waiting = 0;
1034                         tx->tx_sending--;
1035                         
1036                         done = (tx->tx_sending == 0);
1037                         if (done)
1038                                 list_del (&tx->tx_list);
1039                         
1040                         spin_unlock(&conn->ibc_lock);
1041                         
1042                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1043                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
1044                                         vvrc, conn->ibc_peer->ibp_nid);
1045                         else
1046                                 CDEBUG (D_NET, "Error %d posting transmit to "
1047                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1048
1049                         kibnal_close_conn (conn, rc);
1050
1051                         if (done)
1052                                 kibnal_tx_done (tx);
1053                         return;
1054                 }
1055         }
1056
1057         spin_unlock(&conn->ibc_lock);
1058 }
1059
1060 void
1061 kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
1062 {
1063         kib_conn_t   *conn = tx->tx_conn;
1064         int           failed = (vvrc != vv_comp_status_success);
1065         int           idle;
1066
1067         CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n", 
1068                tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);
1069
1070         LASSERT (tx->tx_sending != 0);
1071
1072         if (failed &&
1073             tx->tx_status == 0 &&
1074             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1075                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
1076                         conn->ibc_peer->ibp_nid, vvrc);
1077
1078         spin_lock(&conn->ibc_lock);
1079
1080         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1081          * gets to free it, which also drops its ref on 'conn'. */
1082
1083         tx->tx_sending--;
1084
1085         if (failed) {
1086                 tx->tx_waiting = 0;
1087                 tx->tx_status = -EIO;
1088         }
1089         
1090         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1091                !tx->tx_waiting;                 /* Not waiting for peer */
1092         if (idle)
1093                 list_del(&tx->tx_list);
1094
1095         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1096
1097         if (tx->tx_sending == 0)
1098                 conn->ibc_nsends_posted--;
1099
1100         spin_unlock(&conn->ibc_lock);
1101
1102         if (idle)
1103                 kibnal_tx_done (tx);
1104
1105         if (failed)
1106                 kibnal_close_conn (conn, -EIO);
1107         else
1108                 kibnal_check_sends(conn);
1109
1110         kibnal_conn_decref(conn);               /* ...until here */
1111 }
1112
1113 void
1114 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1115 {
1116         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];
1117         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];
1118         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1119
1120         LASSERT (tx->tx_nwrq >= 0 && 
1121                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1122         LASSERT (nob <= IBNAL_MSG_SIZE);
1123
1124         kibnal_init_msg(tx->tx_msg, type, body_nob);
1125
1126         *gl = (vv_scatgat_t) {
1127                 .v_address = (void *)((unsigned long)KIBNAL_TX_VADDR(tx)),
1128                 .l_key     = KIBNAL_TX_LKEY(tx),
1129                 .length    = nob,
1130         };
1131
1132         memset(wrq, 0, sizeof(*wrq));
1133
1134         wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1135         wrq->wr_type = vv_wr_send;
1136         wrq->scatgat_list = gl;
1137         wrq->num_of_data_segments = 1;
1138         wrq->completion_notification = 1;
1139         wrq->type.send.solicited_event = 1;
1140         wrq->type.send.immidiate_data_indicator = 0;
1141         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1142         
1143         tx->tx_nwrq++;
1144 }
1145
1146 int
1147 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1148                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1149 {
1150         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1151         int              resid = nob;
1152         kib_msg_t       *ibmsg = tx->tx_msg;
1153         kib_rdma_desc_t *srcrd = tx->tx_rd;
1154         kib_rdma_frag_t *srcfrag;
1155         int              srcidx;
1156         kib_rdma_frag_t *dstfrag;
1157         int              dstidx;
1158         vv_scatgat_t    *gl;
1159         vv_wr_t         *wrq;
1160         int              wrknob;
1161         int              rc;
1162
1163         /* Called by scheduler */
1164         LASSERT (!in_interrupt());
1165
1166         LASSERT (type == IBNAL_MSG_GET_DONE ||
1167                  type == IBNAL_MSG_PUT_DONE);
1168
1169         srcidx = dstidx = 0;
1170         srcfrag = &srcrd->rd_frags[0];
1171         dstfrag = &dstrd->rd_frags[0];
1172         rc = resid;
1173
1174         while (resid > 0) {
1175                 if (srcidx >= srcrd->rd_nfrag) {
1176                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1177                         rc = -EPROTO;
1178                         break;
1179                 }
1180                 
1181                 if (dstidx == dstrd->rd_nfrag) {
1182                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1183                         rc = -EPROTO;
1184                         break;
1185                 }
1186
1187                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1188                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1189                                srcidx, srcrd->rd_nfrag,
1190                                dstidx, dstrd->rd_nfrag);
1191                         rc = -EMSGSIZE;
1192                         break;
1193                 }
1194
1195                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1196
1197                 gl = &tx->tx_gl[tx->tx_nwrq];
1198                 gl->v_address = (void *)((unsigned long)kibnal_rf_addr(srcfrag));
1199                 gl->length    = wrknob;
1200                 gl->l_key     = srcrd->rd_key;
1201
1202                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1203
1204                 wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1205                 wrq->completion_notification = 0;
1206                 wrq->scatgat_list = gl;
1207                 wrq->num_of_data_segments = 1;
1208                 wrq->wr_type = vv_wr_rdma_write;
1209                 wrq->type.send.solicited_event = 0;
1210                 wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1211                 wrq->type.send.send_qp_type.rc_type.r_addr = kibnal_rf_addr(dstfrag);
1212                 wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1213
1214                 resid -= wrknob;
1215                 if (wrknob < srcfrag->rf_nob) {
1216                         kibnal_rf_set(srcfrag, 
1217                                       kibnal_rf_addr(srcfrag) + resid, 
1218                                       srcfrag->rf_nob - wrknob);
1219                 } else {
1220                         srcfrag++;
1221                         srcidx++;
1222                 }
1223                 
1224                 if (wrknob < dstfrag->rf_nob) {
1225                         kibnal_rf_set(dstfrag,
1226                                       kibnal_rf_addr(dstfrag) + resid,
1227                                       dstfrag->rf_nob - wrknob);
1228                 } else {
1229                         dstfrag++;
1230                         dstidx++;
1231                 }
1232                 
1233                 tx->tx_nwrq++;
1234         }
1235
1236         if (rc < 0)                             /* no RDMA if completing with failure */
1237                 tx->tx_nwrq = 0;
1238         
1239         ibmsg->ibm_u.completion.ibcm_status = rc;
1240         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1241         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1242
1243         return rc;
1244 }
1245
1246 void
1247 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1248 {
1249         spin_lock(&conn->ibc_lock);
1250         kibnal_queue_tx_locked (tx, conn);
1251         spin_unlock(&conn->ibc_lock);
1252         
1253         kibnal_check_sends(conn);
1254 }
1255
1256 void
1257 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1258 {
1259         kib_peer_t      *peer;
1260         kib_conn_t      *conn;
1261         unsigned long    flags;
1262         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1263
1264         /* If I get here, I've committed to send, so I complete the tx with
1265          * failure on any problems */
1266         
1267         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1268         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1269
1270         read_lock_irqsave(g_lock, flags);
1271         
1272         peer = kibnal_find_peer_locked (nid);
1273         if (peer == NULL) {
1274                 read_unlock_irqrestore(g_lock, flags);
1275                 tx->tx_status = -EHOSTUNREACH;
1276                 kibnal_tx_done (tx);
1277                 return;
1278         }
1279
1280         conn = kibnal_find_conn_locked (peer);
1281         if (conn != NULL) {
1282                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1283                 read_unlock_irqrestore(g_lock, flags);
1284                 
1285                 kibnal_queue_tx (tx, conn);
1286                 kibnal_conn_decref(conn);       /* ...to here */
1287                 return;
1288         }
1289         
1290         /* Making one or more connections; I'll need a write lock... */
1291         read_unlock(g_lock);
1292         write_lock(g_lock);
1293
1294         peer = kibnal_find_peer_locked (nid);
1295         if (peer == NULL) {
1296                 write_unlock_irqrestore(g_lock, flags);
1297                 tx->tx_status = -EHOSTUNREACH;
1298                 kibnal_tx_done (tx);
1299                 return;
1300         }
1301
1302         conn = kibnal_find_conn_locked (peer);
1303         if (conn != NULL) {
1304                 /* Connection exists; queue message on it */
1305                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1306                 write_unlock_irqrestore(g_lock, flags);
1307                 
1308                 kibnal_queue_tx (tx, conn);
1309                 kibnal_conn_decref(conn);       /* ...until here */
1310                 return;
1311         }
1312
1313         if (peer->ibp_connecting == 0) {
1314                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1315                         write_unlock_irqrestore(g_lock, flags);
1316                         tx->tx_status = -EHOSTUNREACH;
1317                         kibnal_tx_done (tx);
1318                         return;
1319                 }
1320         
1321                 peer->ibp_connecting = 1;
1322                 kibnal_peer_addref(peer); /* extra ref for connd */
1323         
1324                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1325         
1326                 list_add_tail (&peer->ibp_connd_list,
1327                                &kibnal_data.kib_connd_peers);
1328                 wake_up (&kibnal_data.kib_connd_waitq);
1329         
1330                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1331         }
1332         
1333         /* A connection is being established; queue the message... */
1334         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1335
1336         write_unlock_irqrestore(g_lock, flags);
1337 }
1338
1339 int
1340 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
1341 {
1342         /* I would guess that if kibnal_get_peer (nid) == NULL,
1343            and we're not routing, then 'nid' is very distant :) */
1344         if ( nal->libnal_ni.ni_pid.nid == nid ) {
1345                 *dist = 0;
1346         } else {
1347                 *dist = 1;
1348         }
1349
1350         return 0;
1351 }
1352
1353 ptl_err_t
1354 kibnal_sendmsg(lib_nal_t    *nal, 
1355                void         *private,
1356                lib_msg_t    *libmsg,
1357                ptl_hdr_t    *hdr, 
1358                int           type, 
1359                ptl_nid_t     nid, 
1360                ptl_pid_t     pid,
1361                unsigned int  payload_niov, 
1362                struct iovec *payload_iov, 
1363                ptl_kiov_t   *payload_kiov,
1364                int           payload_offset,
1365                int           payload_nob)
1366 {
1367         kib_msg_t  *ibmsg;
1368         kib_tx_t   *tx;
1369         int         nob;
1370         int         rc;
1371         int         n;
1372
1373         /* NB 'private' is different depending on what we're sending.... */
1374
1375         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64
1376                " pid %d\n", payload_nob, payload_niov, nid , pid);
1377
1378         LASSERT (payload_nob == 0 || payload_niov > 0);
1379         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1380
1381         /* Thread context */
1382         LASSERT (!in_interrupt());
1383         /* payload is either all vaddrs or all pages */
1384         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1385
1386         switch (type) {
1387         default:
1388                 LBUG();
1389                 return (PTL_FAIL);
1390                 
1391         case PTL_MSG_REPLY: {
1392                 /* reply's 'private' is the incoming receive */
1393                 kib_rx_t *rx = private;
1394
1395                 LASSERT(rx != NULL);
1396
1397                 if (rx->rx_msg->ibm_type == IBNAL_MSG_IMMEDIATE) {
1398                         /* RDMA not expected */
1399                         nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1400                         if (nob > IBNAL_MSG_SIZE) {
1401                                 CERROR("REPLY for "LPX64" too big (RDMA not requested):"
1402                                        "%d (max for message is %d)\n", 
1403                                        nid, payload_nob, IBNAL_MSG_SIZE);
1404                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
1405                                        nob, nid);
1406                                 return PTL_FAIL;
1407                         }
1408                         break;
1409                 }
1410
1411                 /* Incoming message consistent with RDMA? */
1412                 if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
1413                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
1414                                nid, rx->rx_msg->ibm_type);
1415                         return PTL_FAIL;
1416                 }
1417
1418                 /* NB rx_complete() will send GET_NAK when I return to it from
1419                  * here, unless I set rx_responded! */
1420
1421                 tx = kibnal_get_idle_tx(0);
1422                 if (tx == NULL) {
1423                         CERROR("Can't get tx for REPLY to "LPX64"\n", nid);
1424                         return PTL_FAIL;
1425                 }
1426
1427                 if (payload_nob == 0)
1428                         rc = 0;
1429                 else if (payload_kiov == NULL)
1430                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
1431                                                  payload_niov, payload_iov, 
1432                                                  payload_offset, payload_nob);
1433                 else
1434                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1435                                                   payload_niov, payload_kiov,
1436                                                   payload_offset, payload_nob);
1437                 if (rc != 0) {
1438                         CERROR("Can't setup GET src for "LPX64": %d\n", nid, rc);
1439                         kibnal_tx_done(tx);
1440                         return PTL_FAIL;
1441                 }
1442                 
1443                 rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, payload_nob,
1444                                       &rx->rx_msg->ibm_u.get.ibgm_rd,
1445                                       rx->rx_msg->ibm_u.get.ibgm_cookie);
1446                 if (rc < 0) {
1447                         CERROR("Can't setup rdma for GET from "LPX64": %d\n", 
1448                                nid, rc);
1449                 } else if (rc == 0) {
1450                         /* No RDMA: local completion may happen now! */
1451                         lib_finalize (&kibnal_lib, NULL, libmsg, PTL_OK);
1452                 } else {
1453                         /* RDMA: lib_finalize(libmsg) when it completes */
1454                         tx->tx_libmsg[0] = libmsg;
1455                 }
1456
1457                 kibnal_queue_tx(tx, rx->rx_conn);
1458                 rx->rx_responded = 1;
1459                 return (rc >= 0) ? PTL_OK : PTL_FAIL;
1460         }
1461
1462         case PTL_MSG_GET:
1463                 /* will the REPLY message be small enough not to need RDMA? */
1464                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1465                 if (nob <= IBNAL_MSG_SIZE)
1466                         break;
1467
1468                 tx = kibnal_get_idle_tx(1);     /* may block; caller is an app thread */
1469                 LASSERT (tx != NULL);
1470
1471                 ibmsg = tx->tx_msg;
1472                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1473                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1474
1475                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
1476                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1477                                                  vv_acc_r_mem_write,
1478                                                  libmsg->md->md_niov,
1479                                                  libmsg->md->md_iov.iov,
1480                                                  0, libmsg->md->length);
1481                 else
1482                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1483                                                   vv_acc_r_mem_write,
1484                                                   libmsg->md->md_niov,
1485                                                   libmsg->md->md_iov.kiov,
1486                                                   0, libmsg->md->length);
1487                 if (rc != 0) {
1488                         CERROR("Can't setup GET sink for "LPX64": %d\n", nid, rc);
1489                         kibnal_tx_done(tx);
1490                         return PTL_FAIL;
1491                 }
1492
1493                 n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1494                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1495                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1496
1497                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, nid, libmsg);
1498                 if (tx->tx_libmsg[1] == NULL) {
1499                         CERROR("Can't create reply for GET -> "LPX64"\n", nid);
1500                         kibnal_tx_done(tx);
1501                         return PTL_FAIL;
1502                 }
1503
1504                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg[0,1] on completion */
1505                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1506                 kibnal_launch_tx(tx, nid);
1507                 return PTL_OK;
1508
1509         case PTL_MSG_ACK:
1510                 LASSERT (payload_nob == 0);
1511                 break;
1512
1513         case PTL_MSG_PUT:
1514                 /* Is the payload small enough not to need RDMA? */
1515                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1516                 if (nob <= IBNAL_MSG_SIZE)
1517                         break;
1518
1519                 tx = kibnal_get_idle_tx(1);     /* may block: caller is app thread */
1520                 LASSERT (tx != NULL);
1521
1522                 if (payload_kiov == NULL)
1523                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
1524                                                  payload_niov, payload_iov,
1525                                                  payload_offset, payload_nob);
1526                 else
1527                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1528                                                   payload_niov, payload_kiov,
1529                                                   payload_offset, payload_nob);
1530                 if (rc != 0) {
1531                         CERROR("Can't setup PUT src for "LPX64": %d\n", nid, rc);
1532                         kibnal_tx_done(tx);
1533                         return PTL_FAIL;
1534                 }
1535
1536                 ibmsg = tx->tx_msg;
1537                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1538                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1539                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1540
1541                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1542                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1543                 kibnal_launch_tx(tx, nid);
1544                 return PTL_OK;
1545         }
1546
1547         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1548                  <= IBNAL_MSG_SIZE);
1549
1550         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1551                                   type == PTL_MSG_REPLY));
1552         if (tx == NULL) {
1553                 CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", type, nid);
1554                 return PTL_NO_SPACE;
1555         }
1556
1557         ibmsg = tx->tx_msg;
1558         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1559
1560         if (payload_nob > 0) {
1561                 if (payload_kiov != NULL)
1562                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1563                                           payload_niov, payload_kiov,
1564                                           payload_offset, payload_nob);
1565                 else
1566                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1567                                          payload_niov, payload_iov,
1568                                          payload_offset, payload_nob);
1569         }
1570
1571         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1572         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1573
1574         tx->tx_libmsg[0] = libmsg;              /* finalise libmsg on completion */
1575         kibnal_launch_tx(tx, nid);
1576         return PTL_OK;
1577 }
1578
1579 ptl_err_t
1580 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1581                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1582                unsigned int payload_niov, struct iovec *payload_iov,
1583                size_t payload_offset, size_t payload_len)
1584 {
1585         CDEBUG(D_NET, "  pid = %d, nid="LPU64"\n",
1586                pid, nid);
1587         return (kibnal_sendmsg(nal, private, cookie,
1588                                hdr, type, nid, pid,
1589                                payload_niov, payload_iov, NULL,
1590                                payload_offset, payload_len));
1591 }
1592
1593 ptl_err_t
1594 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1595                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1596                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1597                      size_t payload_offset, size_t payload_len)
1598 {
1599         return (kibnal_sendmsg(nal, private, cookie,
1600                                hdr, type, nid, pid,
1601                                payload_niov, NULL, payload_kiov,
1602                                payload_offset, payload_len));
1603 }
1604
1605 ptl_err_t
1606 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1607                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1608                  size_t offset, int mlen, int rlen)
1609 {
1610         kib_rx_t    *rx = private;
1611         kib_msg_t   *rxmsg = rx->rx_msg;
1612         kib_conn_t  *conn = rx->rx_conn;
1613         kib_tx_t    *tx;
1614         kib_msg_t   *txmsg;
1615         int          nob;
1616         int          rc;
1617         int          n;
1618         
1619         LASSERT (mlen <= rlen);
1620         LASSERT (mlen >= 0);
1621         LASSERT (!in_interrupt());
1622         /* Either all pages or all vaddrs */
1623         LASSERT (!(kiov != NULL && iov != NULL));
1624
1625         switch (rxmsg->ibm_type) {
1626         default:
1627                 LBUG();
1628                 
1629         case IBNAL_MSG_IMMEDIATE:
1630                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1631                 if (nob > IBNAL_MSG_SIZE) {
1632                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1633                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1634                         return (PTL_FAIL);
1635                 }
1636
1637                 if (kiov != NULL)
1638                         lib_copy_buf2kiov(niov, kiov, offset,
1639                                           rxmsg->ibm_u.immediate.ibim_payload,
1640                                           mlen);
1641                 else
1642                         lib_copy_buf2iov(niov, iov, offset,
1643                                          rxmsg->ibm_u.immediate.ibim_payload,
1644                                          mlen);
1645
1646                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1647                 return (PTL_OK);
1648
1649         case IBNAL_MSG_PUT_REQ:
1650                 /* NB rx_complete() will send PUT_NAK when I return to it from
1651                  * here, unless I set rx_responded!  */
1652
1653                 if (mlen == 0) { /* No payload to RDMA */
1654                         lib_finalize(nal, NULL, libmsg, PTL_OK);
1655                         return PTL_OK;
1656                 }
1657
1658                 tx = kibnal_get_idle_tx(0);
1659                 if (tx == NULL) {
1660                         CERROR("Can't allocate tx for "LPX64"\n",
1661                                conn->ibc_peer->ibp_nid);
1662                         return PTL_FAIL;
1663                 }
1664
1665                 txmsg = tx->tx_msg;
1666                 if (kiov == NULL)
1667                         rc = kibnal_setup_rd_iov(tx, 
1668                                                  &txmsg->ibm_u.putack.ibpam_rd,
1669                                                  vv_acc_r_mem_write,
1670                                                  niov, iov, offset, mlen);
1671                 else
1672                         rc = kibnal_setup_rd_kiov(tx,
1673                                                   &txmsg->ibm_u.putack.ibpam_rd,
1674                                                   vv_acc_r_mem_write,
1675                                                   niov, kiov, offset, mlen);
1676                 if (rc != 0) {
1677                         CERROR("Can't setup PUT sink for "LPX64": %d\n",
1678                                conn->ibc_peer->ibp_nid, rc);
1679                         kibnal_tx_done(tx);
1680                         return PTL_FAIL;
1681                 }
1682
1683                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1684                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1685
1686                 n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1687                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1688                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1689
1690                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1691                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1692                 kibnal_queue_tx(tx, conn);
1693
1694                 LASSERT (!rx->rx_responded);
1695                 rx->rx_responded = 1;
1696                 return PTL_OK;
1697
1698         case IBNAL_MSG_GET_REQ:
1699                 /* We get called here just to discard any junk after the
1700                  * GET hdr. */
1701                 LASSERT (libmsg == NULL);
1702                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1703                 return (PTL_OK);
1704         }
1705 }
1706
1707 ptl_err_t
1708 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1709               unsigned int niov, struct iovec *iov, 
1710               size_t offset, size_t mlen, size_t rlen)
1711 {
1712         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1713                                 offset, mlen, rlen));
1714 }
1715
1716 ptl_err_t
1717 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1718                      unsigned int niov, ptl_kiov_t *kiov, 
1719                      size_t offset, size_t mlen, size_t rlen)
1720 {
1721         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1722                                 offset, mlen, rlen));
1723 }
1724
1725 int
1726 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1727 {
1728         long    pid = kernel_thread (fn, arg, 0);
1729
1730         if (pid < 0)
1731                 return ((int)pid);
1732
1733         atomic_inc (&kibnal_data.kib_nthreads);
1734         return (0);
1735 }
1736
1737 void
1738 kibnal_thread_fini (void)
1739 {
1740         atomic_dec (&kibnal_data.kib_nthreads);
1741 }
1742
1743 void
1744 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1745 {
1746         /* This just does the immmediate housekeeping.  'error' is zero for a
1747          * normal shutdown which can happen only after the connection has been
1748          * established.  If the connection is established, schedule the
1749          * connection to be finished off by the connd.  Otherwise the connd is
1750          * already dealing with it (either to set it up or tear it down).
1751          * Caller holds kib_global_lock exclusively in irq context */
1752         kib_peer_t   *peer = conn->ibc_peer;
1753
1754         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1755
1756         if (error != 0 && conn->ibc_comms_error == 0)
1757                 conn->ibc_comms_error = error;
1758
1759         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1760                 return; /* already being handled  */
1761
1762         CDEBUG (error == 0 ? D_NET : D_ERROR,
1763                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1764
1765         /* connd takes ibc_list's ref */
1766         list_del (&conn->ibc_list);
1767         
1768         if (list_empty (&peer->ibp_conns) &&
1769             peer->ibp_persistence == 0) {
1770                 /* Non-persistent peer with no more conns... */
1771                 kibnal_unlink_peer_locked (peer);
1772         }
1773
1774         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECT1);
1775
1776         spin_lock(&kibnal_data.kib_connd_lock);
1777
1778         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1779         wake_up (&kibnal_data.kib_connd_waitq);
1780                 
1781         spin_unlock(&kibnal_data.kib_connd_lock);
1782 }
1783
1784 void
1785 kibnal_close_conn (kib_conn_t *conn, int error)
1786 {
1787         unsigned long flags;
1788         
1789         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1790
1791         kibnal_close_conn_locked (conn, error);
1792         
1793         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1794 }
1795
1796 void
1797 kibnal_handle_early_rxs(kib_conn_t *conn)
1798 {
1799         unsigned long    flags;
1800         kib_rx_t        *rx;
1801
1802         LASSERT (!in_interrupt());
1803         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1804         
1805         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1806         while (!list_empty(&conn->ibc_early_rxs)) {
1807                 rx = list_entry(conn->ibc_early_rxs.next,
1808                                 kib_rx_t, rx_list);
1809                 list_del(&rx->rx_list);
1810                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1811                 
1812                 kibnal_handle_rx(rx);
1813                 
1814                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1815         }
1816         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1817 }
1818
1819 void
1820 kibnal_conn_disconnected(kib_conn_t *conn)
1821 {
1822         LIST_HEAD        (zombies); 
1823         struct list_head *tmp;
1824         struct list_head *nxt;
1825         kib_tx_t         *tx;
1826
1827         /* I'm the connd */
1828         LASSERT (!in_interrupt());
1829         LASSERT (current == kibnal_data.kib_connd);
1830         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
1831         
1832         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
1833
1834         /* move QP to error state to make posted work items complete */
1835         kibnal_set_qp_state(conn, vv_qp_state_error);
1836
1837         spin_lock(&conn->ibc_lock);
1838
1839         /* Complete all tx descs not waiting for sends to complete.
1840          * NB we should be safe from RDMA now that the QP has changed state */
1841
1842         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1843                 tx = list_entry (tmp, kib_tx_t, tx_list);
1844
1845                 tx->tx_status = -ECONNABORTED;
1846                 tx->tx_waiting = 0;
1847                 
1848                 if (tx->tx_sending != 0)
1849                         continue;
1850
1851                 list_del (&tx->tx_list);
1852                 list_add (&tx->tx_list, &zombies);
1853         }
1854
1855         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1856                 tx = list_entry (tmp, kib_tx_t, tx_list);
1857
1858                 LASSERT (tx->tx_waiting ||
1859                          tx->tx_sending != 0);
1860
1861                 tx->tx_status = -ECONNABORTED;
1862                 tx->tx_waiting = 0;
1863                 
1864                 if (tx->tx_sending != 0)
1865                         continue;
1866
1867                 list_del (&tx->tx_list);
1868                 list_add (&tx->tx_list, &zombies);
1869         }
1870         
1871         spin_unlock(&conn->ibc_lock);
1872
1873         while (!list_empty(&zombies)) {
1874                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1875
1876                 list_del(&tx->tx_list);
1877                 kibnal_tx_done (tx);
1878         }
1879
1880         kibnal_handle_early_rxs(conn);
1881 }
1882
1883 void
1884 kibnal_peer_connect_failed (kib_peer_t *peer, int active)
1885 {
1886         struct list_head  zombies;
1887         kib_tx_t         *tx;
1888         unsigned long     flags;
1889
1890         /* Only the connd creates conns => single threaded */
1891         LASSERT (!in_interrupt());
1892         LASSERT (current == kibnal_data.kib_connd);
1893         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1894
1895         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1896
1897         if (active) {
1898                 LASSERT (peer->ibp_connecting != 0);
1899                 peer->ibp_connecting--;
1900         } else {
1901                 LASSERT (!kibnal_peer_active(peer));
1902         }
1903         
1904         if (peer->ibp_connecting != 0) {
1905                 /* another connection attempt under way (loopback?)... */
1906                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1907                 return;
1908         }
1909
1910         if (list_empty(&peer->ibp_conns)) {
1911                 /* Say when active connection can be re-attempted */
1912                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1913                 /* Increase reconnection interval */
1914                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1915                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1916         
1917                 /* Take peer's blocked transmits to complete with error */
1918                 list_add(&zombies, &peer->ibp_tx_queue);
1919                 list_del_init(&peer->ibp_tx_queue);
1920                 
1921                 if (kibnal_peer_active(peer) &&
1922                     (peer->ibp_persistence == 0)) {
1923                         /* failed connection attempt on non-persistent peer */
1924                         kibnal_unlink_peer_locked (peer);
1925                 }
1926         } else {
1927                 /* Can't have blocked transmits if there are connections */
1928                 LASSERT (list_empty(&peer->ibp_tx_queue));
1929         }
1930         
1931         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1932
1933         if (list_empty (&zombies)) 
1934                 return;
1935         
1936         CERROR ("Deleting messages for "LPX64": connection failed\n", peer->ibp_nid);
1937         do {
1938                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1939
1940                 list_del (&tx->tx_list);
1941                 /* complete now */
1942                 tx->tx_status = -EHOSTUNREACH;
1943                 kibnal_tx_done (tx);
1944         } while (!list_empty (&zombies));
1945 }
1946
1947 void
1948 kibnal_connreq_done(kib_conn_t *conn, int active, int status)
1949 {
1950         static cm_reject_data_t   rej;
1951
1952         struct list_head   txs;
1953         kib_peer_t        *peer = conn->ibc_peer;
1954         kib_peer_t        *peer2;
1955         unsigned long      flags;
1956         kib_tx_t          *tx;
1957
1958         /* Only the connd creates conns => single threaded */
1959         LASSERT (!in_interrupt());
1960         LASSERT (current == kibnal_data.kib_connd);
1961         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
1962
1963         if (active) {
1964                 LASSERT (peer->ibp_connecting > 0);
1965         } else {
1966                 LASSERT (!kibnal_peer_active(peer));
1967         }
1968         
1969         PORTAL_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1970         conn->ibc_connvars = NULL;
1971
1972         if (status != 0) {
1973                 /* failed to establish connection */
1974                 switch (conn->ibc_state) {
1975                 default:
1976                         LBUG();
1977                 case IBNAL_CONN_ACTIVE_CHECK_REPLY:
1978                         /* got a connection reply but failed checks */
1979                         LASSERT (active);
1980                         memset(&rej, 0, sizeof(rej));
1981                         rej.reason = cm_rej_code_usr_rej;
1982                         cm_reject(conn->ibc_cep, &rej);
1983                         break;
1984
1985                 case IBNAL_CONN_ACTIVE_CONNECT:
1986                         LASSERT (active);
1987                         cm_cancel(conn->ibc_cep);
1988                         kibnal_pause(HZ/10);
1989                         /* cm_connect() failed immediately or
1990                          * callback returned failure */
1991                         break;
1992
1993                 case IBNAL_CONN_ACTIVE_ARP:
1994                         LASSERT (active);
1995                         /* ibat_get_ib_data() failed immediately 
1996                          * or callback returned failure */
1997                         break;
1998
1999                 case IBNAL_CONN_INIT:
2000                         break;
2001
2002                 case IBNAL_CONN_PASSIVE_WAIT:
2003                         LASSERT (!active);
2004                         /* cm_accept callback returned failure */
2005                         break;
2006                 }
2007
2008                 kibnal_peer_connect_failed(conn->ibc_peer, active);
2009                 kibnal_conn_disconnected(conn);
2010                 return;
2011         }
2012
2013         /* connection established */
2014         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2015
2016         if (active) {
2017                 LASSERT(conn->ibc_state == IBNAL_CONN_ACTIVE_RTU);
2018         } else {
2019                 LASSERT(conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2020         }
2021         
2022         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2023
2024         if (!active) {
2025                 peer2 = kibnal_find_peer_locked(peer->ibp_nid);
2026                 if (peer2 != NULL) {
2027                         /* already in the peer table; swap */
2028                         conn->ibc_peer = peer2;
2029                         kibnal_peer_addref(peer2);
2030                         kibnal_peer_decref(peer);
2031                         peer = conn->ibc_peer;
2032                 } else {
2033                         /* add 'peer' to the peer table */
2034                         kibnal_peer_addref(peer);
2035                         list_add_tail(&peer->ibp_list,
2036                                       kibnal_nid2peerlist(peer->ibp_nid));
2037                 }
2038         }
2039         
2040         /* Add conn to peer's list and nuke any dangling conns from a different
2041          * peer instance... */
2042         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2043         list_add(&conn->ibc_list, &peer->ibp_conns);
2044         kibnal_close_stale_conns_locked (conn->ibc_peer,
2045                                          conn->ibc_incarnation);
2046
2047         if (!kibnal_peer_active(peer) ||        /* peer has been deleted */
2048             conn->ibc_comms_error != 0 ||       /* comms error */
2049             conn->ibc_disconnect) {             /* need to disconnect */
2050                 
2051                 /* start to shut down connection */
2052                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2053
2054                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2055                 kibnal_peer_connect_failed(peer, active);
2056                 return;
2057         }
2058
2059         if (active)
2060                 peer->ibp_connecting--;
2061
2062         /* grab pending txs while I have the lock */
2063         list_add(&txs, &peer->ibp_tx_queue);
2064         list_del_init(&peer->ibp_tx_queue);
2065         
2066         /* reset reconnect interval for next attempt */
2067         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
2068         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2069
2070         /* Schedule blocked txs */
2071         spin_lock (&conn->ibc_lock);
2072         while (!list_empty (&txs)) {
2073                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2074                 list_del (&tx->tx_list);
2075
2076                 kibnal_queue_tx_locked (tx, conn);
2077         }
2078         spin_unlock (&conn->ibc_lock);
2079         kibnal_check_sends (conn);
2080
2081         /* schedule blocked rxs */
2082         kibnal_handle_early_rxs(conn);
2083 }
2084
2085 void
2086 kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
2087 {
2088         static cm_dreply_data_t drep;           /* just zeroed space */
2089         
2090         kib_conn_t             *conn = (kib_conn_t *)arg;
2091         unsigned long           flags;
2092         
2093         /* CAVEAT EMPTOR: tasklet context */
2094
2095         switch (cmdata->status) {
2096         default:
2097                 LBUG();
2098                 
2099         case cm_event_disconn_request:
2100                 /* IBNAL_CONN_ACTIVE_RTU:  gets closed in kibnal_connreq_done
2101                  * IBNAL_CONN_ESTABLISHED: I start it closing
2102                  * otherwise:              it's closing anyway */
2103                 cm_disconnect(conn->ibc_cep, NULL, &drep);
2104                 cm_cancel(conn->ibc_cep);
2105
2106                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2107                 LASSERT (!conn->ibc_disconnect);
2108                 conn->ibc_disconnect = 1;
2109
2110                 switch (conn->ibc_state) {
2111                 default:
2112                         LBUG();
2113
2114                 case IBNAL_CONN_ACTIVE_RTU:
2115                         /* kibnal_connreq_done is getting there; It'll see
2116                          * ibc_disconnect set... */
2117                         kibnal_conn_decref(conn); /* lose my ref */
2118                         break;
2119
2120                 case IBNAL_CONN_ESTABLISHED:
2121                         /* kibnal_connreq_done got there already; get
2122                          * disconnect going... */
2123                         kibnal_close_conn_locked(conn, 0);
2124                         kibnal_conn_decref(conn); /* lose my ref */
2125                         break;
2126
2127                 case IBNAL_CONN_DISCONNECT1:
2128                         /* kibnal_terminate_conn is getting there; It'll see
2129                          * ibc_disconnect set... */
2130                         kibnal_conn_decref(conn); /* lose my ref */
2131                         break;
2132
2133                 case IBNAL_CONN_DISCONNECT2:
2134                         /* kibnal_terminate_conn got there already; complete
2135                          * the disconnect.  NB kib_connd_conns takes my ref */
2136                         spin_lock(&kibnal_data.kib_connd_lock);
2137                         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2138                         wake_up(&kibnal_data.kib_connd_waitq);
2139                         spin_unlock(&kibnal_data.kib_connd_lock);
2140                         break;
2141                 }
2142                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2143                 return;
2144                 
2145         case cm_event_disconn_timeout:
2146         case cm_event_disconn_reply:
2147                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2148                 LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT2);
2149                 LASSERT (!conn->ibc_disconnect);
2150                 conn->ibc_disconnect = 1;
2151
2152                 /* kibnal_terminate_conn sent the disconnect request.  
2153                  * NB kib_connd_conns takes my ref */
2154                 spin_lock(&kibnal_data.kib_connd_lock);
2155                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2156                 wake_up(&kibnal_data.kib_connd_waitq);
2157                 spin_unlock(&kibnal_data.kib_connd_lock);
2158
2159                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2160                 break;
2161                 
2162         case cm_event_connected:
2163         case cm_event_conn_timeout:
2164         case cm_event_conn_reject:
2165                 LASSERT (conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2166                 conn->ibc_connvars->cv_conndata = *cmdata;
2167                 
2168                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2169                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2170                 wake_up(&kibnal_data.kib_connd_waitq);
2171                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2172                 break;
2173         }
2174 }
2175
2176 void
2177 kibnal_check_passive_wait(kib_conn_t *conn)
2178 {
2179         int     rc;
2180
2181         switch (conn->ibc_connvars->cv_conndata.status) {
2182         default:
2183                 LBUG();
2184                 
2185         case cm_event_connected:
2186                 kibnal_conn_addref(conn); /* ++ ref for CM callback */
2187                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2188                 if (rc != 0)
2189                         conn->ibc_comms_error = rc;
2190                 /* connection _has_ been established; it's just that we've had
2191                  * an error immediately... */
2192                 kibnal_connreq_done(conn, 0, 0);
2193                 break;
2194                 
2195         case cm_event_conn_timeout:
2196                 kibnal_connreq_done(conn, 0, -ETIMEDOUT);
2197                 break;
2198                 
2199         case cm_event_conn_reject:
2200                 kibnal_connreq_done(conn, 0, -ECONNRESET);
2201                 break;
2202         }
2203 }
2204
2205 void
2206 kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
2207 {
2208         static cm_reply_data_t  reply;
2209         static cm_reject_data_t reject;
2210
2211         kib_msg_t          *rxmsg = (kib_msg_t *)cmreq->priv_data;
2212         kib_msg_t          *txmsg;
2213         kib_conn_t         *conn = NULL;
2214         int                 rc = 0;
2215         kib_connvars_t     *cv;
2216         kib_peer_t         *tmp_peer;
2217         cm_return_t         cmrc;
2218         vv_return_t         vvrc;
2219         
2220         /* I'm the connd executing in thread context
2221          * No concurrency problems with static data! */
2222         LASSERT (!in_interrupt());
2223         LASSERT (current == kibnal_data.kib_connd);
2224
2225         if (cmreq->sid != IBNAL_SERVICE_NUMBER) {
2226                 CERROR(LPX64" != IBNAL_SERVICE_NUMBER("LPX64")\n",
2227                        cmreq->sid, (__u64)IBNAL_SERVICE_NUMBER);
2228                 goto reject;
2229         }
2230
2231         rc = kibnal_unpack_msg(rxmsg, cm_REQ_priv_data_len);
2232         if (rc != 0) {
2233                 CERROR("Can't parse connection request: %d\n", rc);
2234                 goto reject;
2235         }
2236
2237         if (rxmsg->ibm_type != IBNAL_MSG_CONNREQ) {
2238                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
2239                        rxmsg->ibm_type, rxmsg->ibm_srcnid);
2240                 goto reject;
2241         }
2242
2243         if (rxmsg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) {
2244                 CERROR("Can't accept "LPX64": bad dst nid "LPX64"\n",
2245                        rxmsg->ibm_srcnid, rxmsg->ibm_dstnid);
2246                 goto reject;
2247         }
2248
2249         if (rxmsg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2250                 CERROR("Can't accept "LPX64": incompatible queue depth %d (%d wanted)\n",
2251                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_queue_depth, 
2252                        IBNAL_MSG_QUEUE_SIZE);
2253                 goto reject;
2254         }
2255
2256         if (rxmsg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2257                 CERROR("Can't accept "LPX64": message size %d too big (%d max)\n",
2258                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_msg_size, 
2259                        IBNAL_MSG_SIZE);
2260                 goto reject;
2261         }
2262                 
2263         if (rxmsg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2264                 CERROR("Can't accept "LPX64": max frags %d too big (%d max)\n",
2265                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_frags, 
2266                        IBNAL_MAX_RDMA_FRAGS);
2267                 goto reject;
2268         }
2269                 
2270         conn = kibnal_create_conn(cep);
2271         if (conn == NULL) {
2272                 CERROR("Can't create conn for "LPX64"\n", rxmsg->ibm_srcnid);
2273                 goto reject;
2274         }
2275         
2276         /* assume 'rxmsg->ibm_srcnid' is a new peer */
2277         tmp_peer = kibnal_create_peer (rxmsg->ibm_srcnid);
2278         if (tmp_peer == NULL) {
2279                 CERROR("Can't create tmp peer for "LPX64"\n", rxmsg->ibm_srcnid);
2280                 kibnal_conn_decref(conn);
2281                 conn = NULL;
2282                 goto reject;
2283         }
2284
2285         conn->ibc_peer = tmp_peer;              /* conn takes over my ref */
2286         conn->ibc_incarnation = rxmsg->ibm_srcstamp;
2287         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2288
2289         cv = conn->ibc_connvars;
2290
2291         cv->cv_txpsn          = cmreq->cep_data.start_psn;
2292         cv->cv_remote_qpn     = cmreq->cep_data.qpn;
2293         cv->cv_path           = cmreq->path_data.path;
2294         cv->cv_rnr_count      = cmreq->cep_data.rtr_retry_cnt;
2295         // XXX                  cmreq->cep_data.retry_cnt;
2296         cv->cv_port           = cmreq->cep_data.local_port_num;
2297
2298         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2299                              &cv->cv_path.sgid, &cv->cv_sgid_index);
2300         LASSERT (vvrc == vv_return_ok);
2301         
2302         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2303                                cv->cv_path.pkey, &cv->cv_pkey_index);
2304         LASSERT (vvrc == vv_return_ok);
2305
2306         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2307         if (rc != 0)
2308                 goto reject;
2309
2310         rc = kibnal_post_receives(conn);
2311         if (rc != 0) {
2312                 CERROR("Can't post receives for "LPX64"\n", rxmsg->ibm_srcnid);
2313                 goto reject;
2314         }
2315
2316         rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2317         if (rc != 0)
2318                 goto reject;
2319         
2320         memset(&reply, 0, sizeof(reply));
2321         reply.qpn                 = cv->cv_local_qpn;
2322         reply.qkey                = IBNAL_QKEY;
2323         reply.start_psn           = cv->cv_rxpsn;
2324         reply.arb_initiator_depth = IBNAL_ARB_INITIATOR_DEPTH;
2325         reply.arb_resp_res        = IBNAL_ARB_RESP_RES;
2326         reply.failover_accepted   = IBNAL_FAILOVER_ACCEPTED;
2327         reply.rnr_retry_count     = cv->cv_rnr_count;
2328         reply.targ_ack_delay      = kibnal_data.kib_hca_attrs.ack_delay;
2329         
2330         txmsg = (kib_msg_t *)&reply.priv_data;
2331         kibnal_init_msg(txmsg, IBNAL_MSG_CONNACK, 
2332                         sizeof(txmsg->ibm_u.connparams));
2333         LASSERT (txmsg->ibm_nob <= cm_REP_priv_data_len);
2334         txmsg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2335         txmsg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2336         txmsg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2337         kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp);
2338         
2339         kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
2340         
2341         cmrc = cm_accept(conn->ibc_cep, &reply, NULL,
2342                          kibnal_cm_callback, conn);
2343
2344         if (cmrc == cm_stat_success)
2345                 return;                         /* callback has got my ref on conn */
2346
2347         /* back out state change (no callback happening) */
2348         kibnal_set_conn_state(conn, IBNAL_CONN_INIT);
2349         rc = -EIO;
2350                 
2351  reject:
2352         CERROR("Rejected connreq from "LPX64"\n", rxmsg->ibm_srcnid);
2353
2354         memset(&reject, 0, sizeof(reject));
2355         reject.reason = cm_rej_code_usr_rej;
2356         cm_reject(cep, &reject);
2357
2358         if (conn != NULL) {
2359                 LASSERT (rc != 0);
2360                 kibnal_connreq_done(conn, 0, rc);
2361         } else {
2362                 cm_destroy_cep(cep);
2363         }
2364 }
2365
2366 void
2367 kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *data, void *arg)
2368 {
2369         cm_request_data_t  *cmreq = &data->data.request;
2370         kib_pcreq_t        *pcr;
2371         unsigned long       flags;
2372         
2373         LASSERT (arg == NULL);
2374
2375         if (data->status != cm_event_conn_request) {
2376                 CERROR("status %d is not cm_event_conn_request\n",
2377                        data->status);
2378                 return;
2379         }
2380
2381         PORTAL_ALLOC_ATOMIC(pcr, sizeof(*pcr));
2382         if (pcr == NULL) {
2383                 CERROR("Can't allocate passive connreq\n");
2384
2385                 cm_reject(cep, &((cm_reject_data_t) /* NB RO struct */
2386                                  {.reason = cm_rej_code_no_res,}));
2387                 cm_destroy_cep(cep);
2388                 return;
2389         }
2390
2391         pcr->pcr_cep = cep;
2392         pcr->pcr_cmreq = *cmreq;
2393         
2394         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2395
2396         list_add_tail(&pcr->pcr_list, &kibnal_data.kib_connd_pcreqs);
2397         wake_up(&kibnal_data.kib_connd_waitq);
2398         
2399         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2400 }
2401
2402
2403 void
2404 kibnal_active_connect_callback (cm_cep_handle_t cep, cm_conn_data_t *cd, 
2405                                 void *arg)
2406 {
2407         /* CAVEAT EMPTOR: tasklet context */
2408         kib_conn_t       *conn = (kib_conn_t *)arg;
2409         kib_connvars_t   *cv = conn->ibc_connvars;
2410         unsigned long     flags;
2411
2412         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2413         cv->cv_conndata = *cd;
2414
2415         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2416         /* connd takes my ref */
2417         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2418         wake_up(&kibnal_data.kib_connd_waitq);
2419         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2420 }
2421
2422 void
2423 kibnal_connect_conn (kib_conn_t *conn)
2424 {
2425         static cm_request_data_t  cmreq;
2426         kib_msg_t                *msg = (kib_msg_t *)&cmreq.priv_data;
2427         kib_connvars_t           *cv = conn->ibc_connvars;
2428         kib_peer_t               *peer = conn->ibc_peer;
2429         cm_return_t               cmrc;
2430         
2431         /* Only called by connd => statics OK */
2432         LASSERT (!in_interrupt());
2433         LASSERT (current == kibnal_data.kib_connd);
2434         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2435
2436         memset(&cmreq, 0, sizeof(cmreq));
2437         
2438         cmreq.sid = IBNAL_SERVICE_NUMBER;
2439
2440         cmreq.cep_data.ca_guid              = kibnal_data.kib_hca_attrs.guid;
2441         cmreq.cep_data.qpn                  = cv->cv_local_qpn;
2442         cmreq.cep_data.retry_cnt            = IBNAL_RETRY_CNT;
2443         cmreq.cep_data.rtr_retry_cnt        = IBNAL_RNR_CNT;
2444         cmreq.cep_data.start_psn            = cv->cv_rxpsn;
2445         cmreq.cep_data.end_to_end_flow_ctrl = IBNAL_EE_FLOW_CNT;
2446         // XXX ack_timeout?
2447         // offered_resp_res
2448         // offered_initiator_depth
2449
2450         cmreq.path_data.subn_local  = IBNAL_LOCAL_SUB;
2451         cmreq.path_data.path        = cv->cv_path;
2452         
2453         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2454         LASSERT(msg->ibm_nob <= cm_REQ_priv_data_len);
2455         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2456         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2457         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2458         kibnal_pack_msg(msg, 0, peer->ibp_nid, 0);
2459         
2460         CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
2461
2462         kibnal_conn_addref(conn);               /* ++ref for CM callback */
2463         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CONNECT);
2464
2465         cmrc = cm_connect(conn->ibc_cep, &cmreq, 
2466                           kibnal_active_connect_callback, conn);
2467         if (cmrc == cm_stat_success) {
2468                 CDEBUG(D_NET, "connection REQ sent to "LPX64"\n",
2469                        peer->ibp_nid);
2470                 return;
2471         }
2472
2473         CERROR ("Connect "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2474         kibnal_conn_decref(conn);       /* drop callback's ref */
2475         kibnal_connreq_done(conn, 1, -EHOSTUNREACH);
2476 }
2477
2478 void
2479 kibnal_check_connreply (kib_conn_t *conn)
2480 {
2481         static cm_rtu_data_t  rtu;
2482
2483         kib_connvars_t   *cv = conn->ibc_connvars;
2484         cm_reply_data_t  *reply = &cv->cv_conndata.data.reply;
2485         kib_msg_t        *msg = (kib_msg_t *)&reply->priv_data;
2486         kib_peer_t       *peer = conn->ibc_peer;
2487         cm_return_t       cmrc;
2488         cm_cep_handle_t   cep;
2489         unsigned long     flags;
2490         int               rc;
2491
2492         /* Only called by connd => statics OK */
2493         LASSERT (!in_interrupt());
2494         LASSERT (current == kibnal_data.kib_connd);
2495         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2496
2497         if (cv->cv_conndata.status == cm_event_conn_reply) {
2498                 cv->cv_remote_qpn = reply->qpn;
2499                 cv->cv_txpsn      = reply->start_psn;
2500                 // XXX              reply->targ_ack_delay;
2501                 cv->cv_rnr_count  = reply->rnr_retry_count;
2502
2503                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2504
2505                 rc = kibnal_unpack_msg(msg, cm_REP_priv_data_len);
2506                 if (rc != 0) {
2507                         CERROR("Can't unpack reply from "LPX64"\n",
2508                                peer->ibp_nid);
2509                         kibnal_connreq_done(conn, 1, rc);
2510                         return;
2511                 }
2512
2513                 if (msg->ibm_type != IBNAL_MSG_CONNACK ) {
2514                         CERROR("Unexpected message type %d from "LPX64"\n",
2515                                msg->ibm_type, peer->ibp_nid);
2516                         kibnal_connreq_done(conn, 1, -EPROTO);
2517                         return;
2518                 }
2519
2520                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2521                         CERROR(LPX64" has incompatible queue depth %d(%d wanted)\n",
2522                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_queue_depth,
2523                                IBNAL_MSG_QUEUE_SIZE);
2524                         kibnal_connreq_done(conn, 1, -EPROTO);
2525                         return;
2526                 }
2527                 
2528                 if (msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2529                         CERROR(LPX64" max message size %d too big (%d max)\n",
2530                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_msg_size, 
2531                                IBNAL_MSG_SIZE);
2532                         kibnal_connreq_done(conn, 1, -EPROTO);
2533                         return;
2534                 }
2535
2536                 if (msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2537                         CERROR(LPX64" max frags %d too big (%d max)\n",
2538                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_frags, 
2539                                IBNAL_MAX_RDMA_FRAGS);
2540                         kibnal_connreq_done(conn, 1, -EPROTO);
2541                         return;
2542                 }
2543                 
2544                 read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2545                 rc = (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2546                       msg->ibm_dststamp != kibnal_data.kib_incarnation) ?
2547                      -ESTALE : 0;
2548                 read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2549                 if (rc != 0) {
2550                         CERROR("Stale connection reply from "LPX64"\n",
2551                                peer->ibp_nid);
2552                         kibnal_connreq_done(conn, 1, rc);
2553                         return;
2554                 }
2555
2556                 conn->ibc_incarnation = msg->ibm_srcstamp;
2557                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2558                 
2559                 rc = kibnal_post_receives(conn);
2560                 if (rc != 0) {
2561                         CERROR("Can't post receives for "LPX64"\n",
2562                                peer->ibp_nid);
2563                         kibnal_connreq_done(conn, 1, rc);
2564                         return;
2565                 }
2566                 
2567                 rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2568                 if (rc != 0) {
2569                         kibnal_connreq_done(conn, 1, rc);
2570                         return;
2571                 }
2572                 
2573                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2574                 if (rc != 0) {
2575                         kibnal_connreq_done(conn, 1, rc);
2576                         return;
2577                 }
2578                 
2579                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_RTU);
2580                 kibnal_conn_addref(conn);       /* ++for CM callback */
2581                 
2582                 memset(&rtu, 0, sizeof(rtu));
2583                 cmrc = cm_accept(conn->ibc_cep, NULL, &rtu,
2584                                  kibnal_cm_callback, conn);
2585                 if (cmrc == cm_stat_success) {
2586                         /* Now I'm racing with disconnect signalled by
2587                          * kibnal_cm_callback */
2588                         kibnal_connreq_done(conn, 1, 0);
2589                         return;
2590                 }
2591
2592                 CERROR("cm_accept "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2593                 /* Back out of RTU: no callback coming */
2594                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2595                 kibnal_conn_decref(conn);
2596                 kibnal_connreq_done(conn, 1, -EIO);
2597                 return;
2598         }
2599
2600         if (cv->cv_conndata.status == cm_event_conn_reject) {
2601
2602                 if (cv->cv_conndata.data.reject.reason != cm_rej_code_stale_conn) {
2603                         CERROR("conn -> "LPX64" rejected: %d\n", peer->ibp_nid,
2604                                cv->cv_conndata.data.reject.reason);
2605                         kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2606                         return;
2607                 }
2608
2609                 CWARN ("conn -> "LPX64" stale: retrying\n", peer->ibp_nid);
2610
2611                 cep = cm_create_cep(cm_cep_transp_rc);
2612                 if (cep == NULL) {
2613                         CERROR("Can't create new CEP\n");
2614                         kibnal_connreq_done(conn, 1, -ENOMEM);
2615                         return;
2616                 }
2617
2618                 cmrc = cm_cancel(conn->ibc_cep);
2619                 LASSERT (cmrc == cm_stat_success);
2620                 cmrc = cm_destroy_cep(conn->ibc_cep);
2621                 LASSERT (cmrc == cm_stat_success);
2622
2623                 conn->ibc_cep = cep;
2624
2625                 /* retry connect */
2626                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2627                 kibnal_connect_conn(conn);
2628                 return;
2629         }
2630
2631         CERROR("conn -> "LPX64" failed: %d\n", peer->ibp_nid,
2632                cv->cv_conndata.status);
2633         kibnal_connreq_done(conn, 1, -ECONNABORTED);
2634 }
2635
2636 void
2637 kibnal_send_connreq (kib_conn_t *conn)
2638 {
2639         kib_peer_t           *peer = conn->ibc_peer;
2640         kib_connvars_t       *cv = conn->ibc_connvars;
2641         ibat_arp_data_t      *arp = &cv->cv_arp;
2642         ib_path_record_v2_t  *path = &cv->cv_path;
2643         vv_return_t           vvrc;
2644         int                   rc;
2645
2646         /* Only called by connd => statics OK */
2647         LASSERT (!in_interrupt());
2648         LASSERT (current == kibnal_data.kib_connd);
2649         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2650         
2651         if (cv->cv_arprc != ibat_stat_ok) {
2652                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: %d\n", peer->ibp_nid,
2653                        HIPQUAD(peer->ibp_ip), cv->cv_arprc);
2654                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2655                 return;
2656         }
2657
2658         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
2659                 CDEBUG(D_NET, "Got valid path for "LPX64"\n", peer->ibp_nid);
2660
2661                 *path = *arp->primary_path;
2662
2663                 vvrc = base_gid2port_num(kibnal_data.kib_hca, &path->sgid,
2664                                          &cv->cv_port);
2665                 LASSERT (vvrc == vv_return_ok);
2666
2667                 vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2668                                      &path->sgid, &cv->cv_sgid_index);
2669                 LASSERT (vvrc == vv_return_ok);
2670
2671                 vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2672                                        path->pkey, &cv->cv_pkey_index);
2673                 LASSERT (vvrc == vv_return_ok);
2674
2675                 path->mtu = IBNAL_IB_MTU;
2676
2677         } else if ((arp->mask & IBAT_LID_VALID) != 0) {
2678                 CWARN("Creating new path record for "LPX64"@%u.%u.%u.%u\n",
2679                       peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2680
2681                 cv->cv_pkey_index = IBNAL_PKEY_IDX;
2682                 cv->cv_sgid_index = IBNAL_SGID_IDX;
2683                 cv->cv_port = arp->local_port_num;
2684
2685                 memset(path, 0, sizeof(*path));
2686
2687                 vvrc = port_num2base_gid(kibnal_data.kib_hca, cv->cv_port,
2688                                          &path->sgid);
2689                 LASSERT (vvrc == vv_return_ok);
2690
2691                 vvrc = port_num2base_lid(kibnal_data.kib_hca, cv->cv_port,
2692                                          &path->slid);
2693                 LASSERT (vvrc == vv_return_ok);
2694
2695                 path->dgid          = arp->gid;
2696                 path->sl            = IBNAL_SERVICE_LEVEL;
2697                 path->dlid          = arp->lid;
2698                 path->mtu           = IBNAL_IB_MTU;
2699                 path->rate          = IBNAL_STATIC_RATE;
2700                 path->pkt_life_time = IBNAL_PKT_LIFETIME;
2701                 path->pkey          = IBNAL_PKEY;
2702                 path->traffic_class = IBNAL_TRAFFIC_CLASS;
2703         } else {
2704                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: no PATH or LID\n", 
2705                        peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2706                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2707                 return;
2708         }
2709
2710         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2711         if (rc != 0) {
2712                 kibnal_connreq_done(conn, 1, rc);
2713         }
2714
2715         /* do the actual connection request */
2716         kibnal_connect_conn(conn);
2717 }
2718
2719 void
2720 kibnal_arp_callback (ibat_stat_t arprc, ibat_arp_data_t *arp_data, void *arg)
2721 {
2722         /* CAVEAT EMPTOR: tasklet context */
2723         kib_conn_t      *conn = (kib_conn_t *)arg;
2724         kib_peer_t      *peer = conn->ibc_peer;
2725         unsigned long    flags;
2726
2727         CDEBUG(D_NET, "Arp "LPX64"@%u.%u.%u.%u rc %d LID %s PATH %s\n",
2728                peer->ibp_nid, HIPQUAD(peer->ibp_ip), arprc,
2729                (arp_data->mask & IBAT_LID_VALID) == 0 ? "invalid" : "valid",
2730                (arp_data->mask & IBAT_PRI_PATH_VALID) == 0 ? "invalid" : "valid");
2731         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2732
2733         conn->ibc_connvars->cv_arprc = arprc;
2734         if (arprc == ibat_stat_ok)
2735                 conn->ibc_connvars->cv_arp = *arp_data;
2736         
2737         /* connd takes over my ref on conn */
2738         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2739         
2740         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2741         wake_up(&kibnal_data.kib_connd_waitq);
2742         
2743         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2744 }
2745
2746 void
2747 kibnal_arp_peer (kib_peer_t *peer)
2748 {
2749         cm_cep_handle_t  cep;
2750         kib_conn_t      *conn;
2751         int              ibatrc;
2752
2753         /* Only the connd does this (i.e. single threaded) */
2754         LASSERT (current == kibnal_data.kib_connd);
2755         LASSERT (peer->ibp_connecting != 0);
2756
2757         cep = cm_create_cep(cm_cep_transp_rc);
2758         if (cep == NULL) {
2759                 CERROR ("Can't create cep for conn->"LPX64"\n",
2760                         peer->ibp_nid);
2761                 kibnal_peer_connect_failed(peer, 1);
2762                 return;
2763         }
2764
2765         conn = kibnal_create_conn(cep);
2766         if (conn == NULL) {
2767                 CERROR ("Can't allocate conn->"LPX64"\n",
2768                         peer->ibp_nid);
2769                 cm_destroy_cep(cep);
2770                 kibnal_peer_connect_failed(peer, 1);
2771                 return;
2772         }
2773
2774         conn->ibc_peer = peer;
2775         kibnal_peer_addref(peer);
2776
2777         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2778
2779         ibatrc = ibat_get_ib_data(htonl(peer->ibp_ip), INADDR_ANY, 
2780                                   ibat_paths_primary,
2781                                   &conn->ibc_connvars->cv_arp, 
2782                                   kibnal_arp_callback, conn, 0);
2783         CDEBUG(D_NET,"ibatrc %d\n", ibatrc);
2784         switch (ibatrc) {
2785         default:
2786                 LBUG();
2787                 
2788         case ibat_stat_pending:
2789                 /* NB callback has my ref on conn */
2790                 break;
2791                 
2792         case ibat_stat_ok:
2793                 /* Immediate return (ARP cache hit) == no callback. */
2794                 kibnal_send_connreq(conn);
2795                 kibnal_conn_decref(conn);
2796                 break;
2797
2798         case ibat_stat_error:
2799         case ibat_stat_timeout:
2800         case ibat_stat_not_found:
2801                 CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n", peer->ibp_nid,
2802                        HIPQUAD(peer->ibp_ip), ibatrc);
2803                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2804                 kibnal_conn_decref(conn);
2805                 break;
2806         }
2807 }
2808
2809 int
2810 kibnal_conn_timed_out (kib_conn_t *conn)
2811 {
2812         kib_tx_t          *tx;
2813         struct list_head  *ttmp;
2814
2815         spin_lock(&conn->ibc_lock);
2816
2817         list_for_each (ttmp, &conn->ibc_tx_queue) {
2818                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2819
2820                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2821                         spin_unlock(&conn->ibc_lock);
2822                         return 1;
2823                 }
2824         }
2825
2826         list_for_each (ttmp, &conn->ibc_active_txs) {
2827                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2828
2829                 LASSERT (tx->tx_waiting ||
2830                          tx->tx_sending != 0);
2831
2832                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2833                         spin_unlock(&conn->ibc_lock);
2834                         return 1;
2835                 }
2836         }
2837
2838         spin_unlock(&conn->ibc_lock);
2839         return 0;
2840 }
2841
2842 void
2843 kibnal_check_conns (int idx)
2844 {
2845         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2846         struct list_head  *ptmp;
2847         kib_peer_t        *peer;
2848         kib_conn_t        *conn;
2849         struct list_head  *ctmp;
2850         unsigned long      flags;
2851
2852  again:
2853         /* NB. We expect to have a look at all the peers and not find any
2854          * rdmas to time out, so we just use a shared lock while we
2855          * take a look... */
2856         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2857
2858         list_for_each (ptmp, peers) {
2859                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2860
2861                 list_for_each (ctmp, &peer->ibp_conns) {
2862                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2863
2864                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2865
2866                         /* In case we have enough credits to return via a
2867                          * NOOP, but there were no non-blocking tx descs
2868                          * free to do it last time... */
2869                         kibnal_check_sends(conn);
2870
2871                         if (!kibnal_conn_timed_out(conn))
2872                                 continue;
2873
2874                         /* Handle timeout by closing the whole connection.  We
2875                          * can only be sure RDMA activity has ceased once the
2876                          * QP has been modified. */
2877                         
2878                         kibnal_conn_addref(conn); /* 1 ref for me... */
2879
2880                         read_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2881                                                flags);
2882
2883                         CERROR("Timed out RDMA with "LPX64"\n",
2884                                peer->ibp_nid);
2885
2886                         kibnal_close_conn (conn, -ETIMEDOUT);
2887                         kibnal_conn_decref(conn); /* ...until here */
2888
2889                         /* start again now I've dropped the lock */
2890                         goto again;
2891                 }
2892         }
2893
2894         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2895 }
2896
2897 void
2898 kibnal_disconnect_conn (kib_conn_t *conn)
2899 {
2900         static cm_drequest_data_t dreq;         /* just for the space */
2901         
2902         cm_return_t    cmrc;
2903         unsigned long  flags;
2904
2905         LASSERT (!in_interrupt());
2906         LASSERT (current == kibnal_data.kib_connd);
2907         
2908         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2909
2910         if (conn->ibc_disconnect) {
2911                 /* Had the CM callback already */
2912                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
2913                                         flags);
2914                 kibnal_conn_disconnected(conn);
2915                 return;
2916         }
2917                 
2918         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2919
2920         /* active disconnect */
2921         cmrc = cm_disconnect(conn->ibc_cep, &dreq, NULL);
2922         if (cmrc == cm_stat_success) {
2923                 /* waiting for CM */
2924                 conn->ibc_state = IBNAL_CONN_DISCONNECT2;
2925                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2926                 return;
2927         }
2928
2929         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2930
2931         cm_cancel(conn->ibc_cep);
2932         kibnal_pause(HZ/10);
2933
2934         if (!conn->ibc_disconnect)              /* CM callback will never happen now */
2935                 kibnal_conn_decref(conn);
2936         
2937         LASSERT (atomic_read(&conn->ibc_refcount) > 0);
2938         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2939
2940         kibnal_conn_disconnected(conn);
2941 }
2942
2943 int
2944 kibnal_connd (void *arg)
2945 {
2946         wait_queue_t       wait;
2947         unsigned long      flags;
2948         kib_pcreq_t       *pcr;
2949         kib_conn_t        *conn;
2950         kib_peer_t        *peer;
2951         int                timeout;
2952         int                i;
2953         int                dropped_lock;
2954         int                peer_index = 0;
2955         unsigned long      deadline = jiffies;
2956         
2957         kportal_daemonize ("kibnal_connd");
2958         kportal_blockallsigs ();
2959
2960         init_waitqueue_entry (&wait, current);
2961         kibnal_data.kib_connd = current;
2962
2963         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2964
2965         while (!kibnal_data.kib_shutdown) {
2966
2967                 dropped_lock = 0;
2968
2969                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
2970                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
2971                                            kib_conn_t, ibc_list);
2972                         list_del (&conn->ibc_list);
2973                         
2974                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2975                         dropped_lock = 1;
2976
2977                         kibnal_destroy_conn(conn);
2978
2979                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2980                 }
2981
2982                 if (!list_empty (&kibnal_data.kib_connd_pcreqs)) {
2983                         pcr = list_entry(kibnal_data.kib_connd_pcreqs.next,
2984                                          kib_pcreq_t, pcr_list);
2985                         list_del(&pcr->pcr_list);
2986                         
2987                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2988                         dropped_lock = 1;
2989
2990                         kibnal_recv_connreq(pcr->pcr_cep, &pcr->pcr_cmreq);
2991                         PORTAL_FREE(pcr, sizeof(*pcr));
2992
2993                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2994                 }
2995                         
2996                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2997                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2998                                            kib_peer_t, ibp_connd_list);
2999                         
3000                         list_del_init (&peer->ibp_connd_list);
3001                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3002                         dropped_lock = 1;
3003
3004                         kibnal_arp_peer (peer);
3005                         kibnal_peer_decref (peer);
3006
3007                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3008                 }
3009
3010                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3011                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3012                                            kib_conn_t, ibc_list);
3013                         list_del (&conn->ibc_list);
3014                         
3015                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3016                         dropped_lock = 1;
3017
3018                         switch (conn->ibc_state) {
3019                         default:
3020                                 LBUG();
3021                                 
3022                         case IBNAL_CONN_ACTIVE_ARP:
3023                                 kibnal_send_connreq(conn);
3024                                 break;
3025
3026                         case IBNAL_CONN_ACTIVE_CONNECT:
3027                                 kibnal_check_connreply(conn);
3028                                 break;
3029
3030                         case IBNAL_CONN_PASSIVE_WAIT:
3031                                 kibnal_check_passive_wait(conn);
3032                                 break;
3033
3034                         case IBNAL_CONN_DISCONNECT1:
3035                         case IBNAL_CONN_DISCONNECT2:
3036                                 kibnal_disconnect_conn(conn);
3037                                 break;
3038                         }
3039                         kibnal_conn_decref(conn);
3040
3041                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3042                 }
3043
3044                 /* careful with the jiffy wrap... */
3045                 timeout = (int)(deadline - jiffies);
3046                 if (timeout <= 0) {
3047                         const int n = 4;
3048                         const int p = 1;
3049                         int       chunk = kibnal_data.kib_peer_hash_size;
3050                         
3051                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3052                         dropped_lock = 1;
3053
3054                         /* Time to check for RDMA timeouts on a few more
3055                          * peers: I do checks every 'p' seconds on a
3056                          * proportion of the peer table and I need to check
3057                          * every connection 'n' times within a timeout
3058                          * interval, to ensure I detect a timeout on any
3059                          * connection within (n+1)/n times the timeout
3060                          * interval. */
3061
3062                         if (kibnal_tunables.kib_io_timeout > n * p)
3063                                 chunk = (chunk * n * p) / 
3064                                         kibnal_tunables.kib_io_timeout;
3065                         if (chunk == 0)
3066                                 chunk = 1;
3067
3068                         for (i = 0; i < chunk; i++) {
3069                                 kibnal_check_conns (peer_index);
3070                                 peer_index = (peer_index + 1) % 
3071                                              kibnal_data.kib_peer_hash_size;
3072                         }
3073
3074                         deadline += p * HZ;
3075                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3076                 }
3077
3078                 if (dropped_lock)
3079                         continue;
3080                 
3081                 /* Nothing to do for 'timeout'  */
3082                 set_current_state (TASK_INTERRUPTIBLE);
3083                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3084                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3085
3086                 schedule_timeout (timeout);
3087
3088                 set_current_state (TASK_RUNNING);
3089                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3090                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3091         }
3092
3093         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3094
3095         kibnal_thread_fini ();
3096         return (0);
3097 }
3098
3099 void 
3100 kibnal_async_callback(vv_event_record_t ev)
3101 {
3102         CERROR("type: %d, port: %d, data: "LPX64"\n", 
3103                ev.event_type, ev.port_num, ev.type.data);
3104 }
3105
3106 void
3107 kibnal_cq_callback (unsigned long unused_context)
3108 {
3109         unsigned long    flags;
3110
3111         CDEBUG(D_NET, "!!\n");
3112
3113         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3114         kibnal_data.kib_ready = 1;
3115         wake_up(&kibnal_data.kib_sched_waitq);
3116         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3117 }
3118
3119 int
3120 kibnal_scheduler(void *arg)
3121 {
3122         long            id = (long)arg;
3123         wait_queue_t    wait;
3124         char            name[16];
3125         vv_wc_t         wc;
3126         vv_return_t     vvrc;
3127         vv_return_t     vvrc2;
3128         unsigned long   flags;
3129         int             busy_loops = 0;
3130
3131         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3132         kportal_daemonize(name);
3133         kportal_blockallsigs();
3134
3135         init_waitqueue_entry(&wait, current);
3136
3137         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3138
3139         while (!kibnal_data.kib_shutdown) {
3140                 if (busy_loops++ >= IBNAL_RESCHED) {
3141                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3142                                                flags);
3143
3144                         our_cond_resched();
3145                         busy_loops = 0;
3146                         
3147                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3148                 }
3149
3150                 if (kibnal_data.kib_ready &&
3151                     !kibnal_data.kib_checking_cq) {
3152                         /* take ownership of completion polling */
3153                         kibnal_data.kib_checking_cq = 1;
3154                         /* Assume I'll exhaust the CQ */
3155                         kibnal_data.kib_ready = 0;
3156                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, 
3157                                                flags);
3158                         
3159                         vvrc = vv_poll_for_completion(kibnal_data.kib_hca, 
3160                                                       kibnal_data.kib_cq, &wc);
3161                         if (vvrc == vv_return_err_cq_empty) {
3162                                 vvrc2 = vv_request_completion_notification(
3163                                         kibnal_data.kib_hca, 
3164                                         kibnal_data.kib_cq, 
3165                                         vv_next_solicit_unsolicit_event);
3166                                 LASSERT (vvrc2 == vv_return_ok);
3167                         }
3168                         
3169                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3170                         /* give up ownership of completion polling */
3171                         kibnal_data.kib_checking_cq = 0;
3172
3173                         if (vvrc == vv_return_err_cq_empty)
3174                                 continue;
3175
3176                         LASSERT (vvrc == vv_return_ok);
3177                         /* Assume there's more: get another scheduler to check
3178                          * while I handle this completion... */
3179
3180                         kibnal_data.kib_ready = 1;
3181                         wake_up(&kibnal_data.kib_sched_waitq);
3182
3183                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3184                                                flags);
3185
3186                         switch (kibnal_wreqid2type(wc.wr_id)) {
3187                         case IBNAL_WID_RX:
3188                                 kibnal_rx_complete(
3189                                         (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id),
3190                                         wc.completion_status,
3191                                         wc.num_bytes_transfered);
3192                                 break;
3193
3194                         case IBNAL_WID_TX:
3195                                 kibnal_tx_complete(
3196                                         (kib_tx_t *)kibnal_wreqid2ptr(wc.wr_id),
3197                                         wc.completion_status);
3198                                 break;
3199
3200                         case IBNAL_WID_RDMA:
3201                                 /* We only get RDMA completion notification if
3202                                  * it fails.  So we just ignore them completely
3203                                  * because...
3204                                  *
3205                                  * 1) If an RDMA fails, all subsequent work
3206                                  * items, including the final SEND will fail
3207                                  * too, so I'm still guaranteed to notice that
3208                                  * this connection is hosed.
3209                                  *
3210                                  * 2) It's positively dangerous to look inside
3211                                  * the tx descriptor obtained from an RDMA work
3212                                  * item.  As soon as I drop the kib_sched_lock,
3213                                  * I give a scheduler on another CPU a chance
3214                                  * to get the final SEND completion, so the tx
3215                                  * descriptor can get freed as I inspect it. */
3216                                 CERROR ("RDMA failed: %d\n", 
3217                                         wc.completion_status);
3218                                 break;
3219
3220                         default:
3221                                 LBUG();
3222                         }
3223                         
3224                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3225                         continue;
3226                 }
3227
3228                 /* Nothing to do; sleep... */
3229
3230                 set_current_state(TASK_INTERRUPTIBLE);
3231                 add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3232                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3233                                        flags);
3234
3235                 schedule();
3236
3237                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3238                 set_current_state(TASK_RUNNING);
3239                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3240         }
3241
3242         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3243
3244         kibnal_thread_fini();
3245         return (0);
3246 }
3247
3248
3249 lib_nal_t kibnal_lib = {
3250         .libnal_data = &kibnal_data,      /* NAL private data */
3251         .libnal_send = kibnal_send,
3252         .libnal_send_pages = kibnal_send_pages,
3253         .libnal_recv = kibnal_recv,
3254         .libnal_recv_pages = kibnal_recv_pages,
3255         .libnal_dist = kibnal_dist
3256 };