Whamcloud - gitweb
* Fixed the vibnal resilience-in-the-face-of-peer-crashes issue
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 void
28 kibnal_tx_done (kib_tx_t *tx)
29 {
30         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
31         int              i;
32
33         LASSERT (!in_interrupt());
34         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
35         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
36
37 #if !IBNAL_WHOLE_MEM
38         switch (tx->tx_mapped) {
39         default:
40                 LBUG();
41
42         case KIB_TX_UNMAPPED:
43                 break;
44
45         case KIB_TX_MAPPED: {
46                 vv_return_t      vvrc;
47
48                 vvrc = vv_mem_region_destroy(kibnal_data.kib_hca,
49                                              tx->tx_md.md_handle);
50                 LASSERT (vvrc == vv_return_ok);
51                 tx->tx_mapped = KIB_TX_UNMAPPED;
52                 break;
53         }
54         }
55 #endif
56         for (i = 0; i < 2; i++) {
57                 /* tx may have up to 2 libmsgs to finalise */
58                 if (tx->tx_libmsg[i] == NULL)
59                         continue;
60
61                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
62                 tx->tx_libmsg[i] = NULL;
63         }
64         
65         if (tx->tx_conn != NULL) {
66                 kibnal_conn_decref(tx->tx_conn);
67                 tx->tx_conn = NULL;
68         }
69
70         tx->tx_nwrq = 0;
71         tx->tx_status = 0;
72
73         spin_lock(&kibnal_data.kib_tx_lock);
74
75         if (tx->tx_isnblk) {
76                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
77         } else {
78                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
79                 wake_up (&kibnal_data.kib_idle_tx_waitq);
80         }
81
82         spin_unlock(&kibnal_data.kib_tx_lock);
83 }
84
85 kib_tx_t *
86 kibnal_get_idle_tx (int may_block) 
87 {
88         kib_tx_t      *tx = NULL;
89         ENTRY;
90         
91         for (;;) {
92                 spin_lock(&kibnal_data.kib_tx_lock);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
96                         tx = list_entry (kibnal_data.kib_idle_txs.next,
97                                          kib_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
104                                 CERROR ("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
109                                          kib_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock(&kibnal_data.kib_tx_lock);
115
116                 wait_event (kibnal_data.kib_idle_tx_waitq,
117                             !list_empty (&kibnal_data.kib_idle_txs) ||
118                             kibnal_data.kib_shutdown);
119         }
120
121         if (tx != NULL) {
122                 list_del (&tx->tx_list);
123
124                 /* Allocate a new completion cookie.  It might not be needed,
125                  * but we've got a lock right now and we're unlikely to
126                  * wrap... */
127                 tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
128 #if IBNAL_WHOLE_MEM
129                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
130 #endif
131                 LASSERT (tx->tx_nwrq == 0);
132                 LASSERT (tx->tx_sending == 0);
133                 LASSERT (!tx->tx_waiting);
134                 LASSERT (tx->tx_status == 0);
135                 LASSERT (tx->tx_conn == NULL);
136                 LASSERT (tx->tx_libmsg[0] == NULL);
137                 LASSERT (tx->tx_libmsg[1] == NULL);
138         }
139
140         spin_unlock(&kibnal_data.kib_tx_lock);
141         
142         RETURN(tx);
143 }
144
145 int
146 kibnal_post_rx (kib_rx_t *rx, int credit)
147 {
148         kib_conn_t   *conn = rx->rx_conn;
149         int           rc = 0;
150         vv_return_t   vvrc;
151
152         LASSERT (!in_interrupt());
153         
154         rx->rx_gl = (vv_scatgat_t) {
155                 .v_address = (void *)((unsigned long)KIBNAL_RX_VADDR(rx)),
156                 .l_key     = KIBNAL_RX_LKEY(rx),
157                 .length    = IBNAL_MSG_SIZE,
158         };
159
160         rx->rx_wrq = (vv_wr_t) {
161                 .wr_id                   = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
162                 .completion_notification = 1,
163                 .scatgat_list            = &rx->rx_gl,
164                 .num_of_data_segments    = 1,
165                 .wr_type                 = vv_wr_receive,
166         };
167
168         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
169         LASSERT (!rx->rx_posted);
170
171         CDEBUG(D_NET, "posting rx [%d %x %p]\n", 
172                rx->rx_wrq.scatgat_list->length,
173                rx->rx_wrq.scatgat_list->l_key,
174                rx->rx_wrq.scatgat_list->v_address);
175
176         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
177                 /* No more posts for this rx; so lose its ref */
178                 kibnal_conn_decref(conn);
179                 return 0;
180         }
181         
182         rx->rx_posted = 1;
183
184         spin_lock(&conn->ibc_lock);
185         /* Serialise vv_post_receive; it's not re-entrant on the same QP */
186         vvrc = vv_post_receive(kibnal_data.kib_hca,
187                                conn->ibc_qp, &rx->rx_wrq);
188         spin_unlock(&conn->ibc_lock);
189
190         if (vvrc == 0) {
191                 if (credit) {
192                         spin_lock(&conn->ibc_lock);
193                         conn->ibc_outstanding_credits++;
194                         spin_unlock(&conn->ibc_lock);
195
196                         kibnal_check_sends(conn);
197                 }
198                 return 0;
199         }
200         
201         CERROR ("post rx -> "LPX64" failed %d\n", 
202                 conn->ibc_peer->ibp_nid, vvrc);
203         rc = -EIO;
204         kibnal_close_conn(rx->rx_conn, rc);
205         /* No more posts for this rx; so lose its ref */
206         kibnal_conn_decref(conn);
207         return rc;
208 }
209
210 int
211 kibnal_post_receives (kib_conn_t *conn)
212 {
213         int    i;
214         int    rc;
215
216         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
217         LASSERT (conn->ibc_comms_error == 0);
218
219         for (i = 0; i < IBNAL_RX_MSGS; i++) {
220                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
221                  * fails (i.e. actual failure or we're disconnecting) */
222                 kibnal_conn_addref(conn);
223                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0);
224                 if (rc != 0)
225                         return rc;
226         }
227
228         return 0;
229 }
230
231 kib_tx_t *
232 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
233 {
234         struct list_head   *tmp;
235         
236         list_for_each(tmp, &conn->ibc_active_txs) {
237                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
238                 
239                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
240
241                 if (tx->tx_cookie != cookie)
242                         continue;
243
244                 if (tx->tx_waiting &&
245                     tx->tx_msg->ibm_type == txtype)
246                         return tx;
247
248                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
249                       tx->tx_waiting ? "" : "NOT ",
250                       tx->tx_msg->ibm_type, txtype);
251         }
252         return NULL;
253 }
254
255 void
256 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
257 {
258         kib_tx_t    *tx;
259         int          idle;
260
261         spin_lock(&conn->ibc_lock);
262
263         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
264         if (tx == NULL) {
265                 spin_unlock(&conn->ibc_lock);
266
267                 CWARN("Unmatched completion type %x cookie "LPX64
268                       " from "LPX64"\n",
269                       txtype, cookie, conn->ibc_peer->ibp_nid);
270                 kibnal_close_conn (conn, -EPROTO);
271                 return;
272         }
273
274         if (tx->tx_status == 0) {               /* success so far */
275                 if (status < 0) {               /* failed? */
276                         tx->tx_status = status;
277                 } else if (txtype == IBNAL_MSG_GET_REQ) { 
278                         /* XXX layering violation: set REPLY data length */
279                         LASSERT (tx->tx_libmsg[1] != NULL);
280                         LASSERT (tx->tx_libmsg[1]->ev.type == 
281                                  PTL_EVENT_REPLY_END);
282
283                         tx->tx_libmsg[1]->ev.mlength = status;
284                 }
285         }
286         
287         tx->tx_waiting = 0;
288
289         idle = tx->tx_sending == 0;
290         if (idle)
291                 list_del(&tx->tx_list);
292
293         spin_unlock(&conn->ibc_lock);
294         
295         if (idle)
296                 kibnal_tx_done(tx);
297 }
298
299 void
300 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
301 {
302         kib_tx_t    *tx = kibnal_get_idle_tx(0);
303         
304         if (tx == NULL) {
305                 CERROR("Can't get tx for completion %x for "LPX64"\n",
306                        type, conn->ibc_peer->ibp_nid);
307                 return;
308         }
309         
310         tx->tx_msg->ibm_u.completion.ibcm_status = status;
311         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
312         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
313         
314         kibnal_queue_tx(tx, conn);
315 }
316
317 void
318 kibnal_handle_rx (kib_rx_t *rx)
319 {
320         kib_msg_t    *msg = rx->rx_msg;
321         kib_conn_t   *conn = rx->rx_conn;
322         int           credits = msg->ibm_credits;
323         kib_tx_t     *tx;
324         int           rc;
325
326         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
327
328         CDEBUG (D_NET, "Received %x[%d] from "LPX64"\n",
329                 msg->ibm_type, credits, conn->ibc_peer->ibp_nid);
330         
331         if (credits != 0) {
332                 /* Have I received credits that will let me send? */
333                 spin_lock(&conn->ibc_lock);
334                 conn->ibc_credits += credits;
335                 spin_unlock(&conn->ibc_lock);
336
337                 kibnal_check_sends(conn);
338         }
339
340         switch (msg->ibm_type) {
341         default:
342                 CERROR("Bad IBNAL message type %x from "LPX64"\n",
343                        msg->ibm_type, conn->ibc_peer->ibp_nid);
344                 break;
345
346         case IBNAL_MSG_NOOP:
347                 break;
348
349         case IBNAL_MSG_IMMEDIATE:
350                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
351                 break;
352                 
353         case IBNAL_MSG_PUT_REQ:
354                 rx->rx_responded = 0;
355                 lib_parse(&kibnal_lib, &msg->ibm_u.putreq.ibprm_hdr, rx);
356                 if (rx->rx_responded)
357                         break;
358
359                 /* I wasn't asked to transfer any payload data.  This happens
360                  * if the PUT didn't match, or got truncated. */
361                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
362                                        msg->ibm_u.putreq.ibprm_cookie);
363                 break;
364
365         case IBNAL_MSG_PUT_NAK:
366                 CWARN ("PUT_NACK from "LPX64"\n", conn->ibc_peer->ibp_nid);
367                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
368                                          msg->ibm_u.completion.ibcm_status,
369                                          msg->ibm_u.completion.ibcm_cookie);
370                 break;
371
372         case IBNAL_MSG_PUT_ACK:
373                 spin_lock(&conn->ibc_lock);
374                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
375                                                    msg->ibm_u.putack.ibpam_src_cookie);
376                 if (tx != NULL)
377                         list_del(&tx->tx_list);
378                 spin_unlock(&conn->ibc_lock);
379
380                 if (tx == NULL) {
381                         CERROR("Unmatched PUT_ACK from "LPX64"\n",
382                                conn->ibc_peer->ibp_nid);
383                         kibnal_close_conn(conn, -EPROTO);
384                         break;
385                 }
386
387                 LASSERT (tx->tx_waiting);
388                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
389                  * (a) I can overwrite tx_msg since my peer has received it!
390                  * (b) while tx_waiting is set, tx_complete() won't touch it.
391                  */
392
393                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
394
395                 rc = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
396                                       kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
397                                       &msg->ibm_u.putack.ibpam_rd,
398                                       msg->ibm_u.putack.ibpam_dst_cookie);
399                 if (rc < 0)
400                         CERROR("Can't setup rdma for PUT to "LPX64": %d\n",
401                                conn->ibc_peer->ibp_nid, rc);
402
403                 spin_lock(&conn->ibc_lock);
404                 if (tx->tx_status == 0 && rc < 0)
405                         tx->tx_status = rc;
406                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
407                 kibnal_queue_tx_locked(tx, conn);
408                 spin_unlock(&conn->ibc_lock);
409                 break;
410                 
411         case IBNAL_MSG_PUT_DONE:
412                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
413                                          msg->ibm_u.completion.ibcm_status,
414                                          msg->ibm_u.completion.ibcm_cookie);
415                 break;
416
417         case IBNAL_MSG_GET_REQ:
418                 rx->rx_responded = 0;
419                 lib_parse(&kibnal_lib, &msg->ibm_u.get.ibgm_hdr, rx);
420                 if (rx->rx_responded)           /* I responded to the GET_REQ */
421                         break;
422                 /* NB GET didn't match (I'd have responded even with no payload
423                  * data) */
424                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, -ENODATA,
425                                        msg->ibm_u.get.ibgm_cookie);
426                 break;
427
428         case IBNAL_MSG_GET_DONE:
429                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
430                                          msg->ibm_u.completion.ibcm_status,
431                                          msg->ibm_u.completion.ibcm_cookie);
432                 break;
433         }
434
435         kibnal_post_rx(rx, 1);
436 }
437
438 void
439 kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob)
440 {
441         kib_msg_t    *msg = rx->rx_msg;
442         kib_conn_t   *conn = rx->rx_conn;
443         unsigned long flags;
444         int           rc;
445
446         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
447         LASSERT (rx->rx_posted);
448         rx->rx_posted = 0;
449
450         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
451                 goto ignore;
452
453         if (vvrc != vv_comp_status_success) {
454                 CERROR("Rx from "LPX64" failed: %d\n", 
455                        conn->ibc_peer->ibp_nid, vvrc);
456                 goto failed;
457         }
458
459         rc = kibnal_unpack_msg(msg, nob);
460         if (rc != 0) {
461                 CERROR ("Error %d unpacking rx from "LPX64"\n",
462                         rc, conn->ibc_peer->ibp_nid);
463                 goto failed;
464         }
465
466         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
467             msg->ibm_srcstamp != conn->ibc_incarnation ||
468             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
469             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
470                 CERROR ("Stale rx from "LPX64"\n",
471                         conn->ibc_peer->ibp_nid);
472                 goto failed;
473         }
474
475         /* racing with connection establishment/teardown! */
476
477         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
478                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
479                 /* must check holding global lock to eliminate race */
480                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
481                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
482                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
483                                                 flags);
484                         return;
485                 }
486                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
487                                         flags);
488         }
489         kibnal_handle_rx(rx);
490         return;
491         
492  failed:
493         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
494         kibnal_close_conn(conn, -EIO);
495  ignore:
496         /* Don't re-post rx & drop its ref on conn */
497         kibnal_conn_decref(conn);
498 }
499
500 #if IBNAL_WHOLE_MEM
501 int
502 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
503                      unsigned long page_offset, unsigned long len)
504 {
505         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
506         vv_l_key_t       l_key;
507         vv_r_key_t       r_key;
508         void            *addr;
509         void            *vaddr;
510         vv_mem_reg_h_t   mem_h;
511         vv_return_t      vvrc;
512
513         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
514                 CERROR ("Too many RDMA fragments\n");
515                 return -EMSGSIZE;
516         }
517
518         addr = (void *)(((unsigned long)kmap(page)) + page_offset);
519
520         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca, addr,
521                                     len, &mem_h, &l_key, &r_key);
522         LASSERT (vvrc == vv_return_ok);
523
524         kunmap(page);
525
526         if (active) {
527                 if (rd->rd_nfrag == 0) {
528                         rd->rd_key = l_key;
529                 } else if (l_key != rd->rd_key) {
530                         CERROR ("> 1 key for single RDMA desc\n");
531                         return -EINVAL;
532                 }
533                 vaddr = addr;
534         } else {
535                 if (rd->rd_nfrag == 0) {
536                         rd->rd_key = r_key;
537                 } else if (r_key != rd->rd_key) {
538                         CERROR ("> 1 key for single RDMA desc\n");
539                         return -EINVAL;
540                 }
541                 vv_va2advertise_addr(kibnal_data.kib_hca, addr, &vaddr);
542         }
543
544         kibnal_rf_set(frag, (unsigned long)vaddr, len);
545
546         CDEBUG(D_NET,"map frag [%d][%d %x %08x%08x] %p\n", 
547                rd->rd_nfrag, frag->rf_nob, rd->rd_key, 
548                frag->rf_addr_hi, frag->rf_addr_lo, addr);
549
550         rd->rd_nfrag++;
551         return 0;
552 }
553
554 struct page *
555 kibnal_kvaddr_to_page (unsigned long vaddr)
556 {
557         struct page *page;
558
559         if (vaddr >= VMALLOC_START &&
560             vaddr < VMALLOC_END)
561                 page = vmalloc_to_page ((void *)vaddr);
562 #if CONFIG_HIGHMEM
563         else if (vaddr >= PKMAP_BASE &&
564                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
565                 page = vmalloc_to_page ((void *)vaddr);
566         /* in 2.4 ^ just walks the page tables */
567 #endif
568         else
569                 page = virt_to_page (vaddr);
570
571         return VALID_PAGE(page) ? page : NULL;
572 }
573
574 int
575 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, 
576                     vv_access_con_bit_mask_t access,
577                     int niov, struct iovec *iov, int offset, int nob)
578                  
579 {
580         /* active if I'm sending */
581         int           active = ((access & vv_acc_r_mem_write) == 0);
582         int           fragnob;
583         int           rc;
584         unsigned long vaddr;
585         struct page  *page;
586         int           page_offset;
587
588         LASSERT (nob > 0);
589         LASSERT (niov > 0);
590         LASSERT ((rd != tx->tx_rd) == !active);
591
592         while (offset >= iov->iov_len) {
593                 offset -= iov->iov_len;
594                 niov--;
595                 iov++;
596                 LASSERT (niov > 0);
597         }
598
599         rd->rd_nfrag = 0;
600         do {
601                 LASSERT (niov > 0);
602
603                 vaddr = ((unsigned long)iov->iov_base) + offset;
604                 page_offset = vaddr & (PAGE_SIZE - 1);
605                 page = kibnal_kvaddr_to_page(vaddr);
606                 if (page == NULL) {
607                         CERROR ("Can't find page\n");
608                         return -EFAULT;
609                 }
610
611                 fragnob = min((int)(iov->iov_len - offset), nob);
612                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
613
614                 rc = kibnal_append_rdfrag(rd, active, page, 
615                                           page_offset, fragnob);
616                 if (rc != 0)
617                         return rc;
618
619                 if (offset + fragnob < iov->iov_len) {
620                         offset += fragnob;
621                 } else {
622                         offset = 0;
623                         iov++;
624                         niov--;
625                 }
626                 nob -= fragnob;
627         } while (nob > 0);
628         
629         return 0;
630 }
631
632 int
633 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, 
634                       vv_access_con_bit_mask_t access,
635                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
636 {
637         /* active if I'm sending */
638         int            active = ((access & vv_acc_r_mem_write) == 0);
639         int            fragnob;
640         int            rc;
641
642         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
643
644         LASSERT (nob > 0);
645         LASSERT (nkiov > 0);
646         LASSERT ((rd != tx->tx_rd) == !active);
647
648         while (offset >= kiov->kiov_len) {
649                 offset -= kiov->kiov_len;
650                 nkiov--;
651                 kiov++;
652                 LASSERT (nkiov > 0);
653         }
654
655         rd->rd_nfrag = 0;
656         do {
657                 LASSERT (nkiov > 0);
658                 fragnob = min((int)(kiov->kiov_len - offset), nob);
659                 
660                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
661                                           kiov->kiov_offset + offset,
662                                           fragnob);
663                 if (rc != 0)
664                         return rc;
665
666                 offset = 0;
667                 kiov++;
668                 nkiov--;
669                 nob -= fragnob;
670         } while (nob > 0);
671
672         return 0;
673 }
674 #else
675 int
676 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
677                      vv_access_con_bit_mask_t access,
678                      int niov, struct iovec *iov, int offset, int nob)
679                  
680 {
681         /* active if I'm sending */
682         int         active = ((access & vv_acc_r_mem_write) == 0);
683         void       *vaddr;
684         vv_return_t vvrc;
685
686         LASSERT (nob > 0);
687         LASSERT (niov > 0);
688         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
689         LASSERT ((rd != tx->tx_rd) == !active);
690
691         while (offset >= iov->iov_len) {
692                 offset -= iov->iov_len;
693                 niov--;
694                 iov++;
695                 LASSERT (niov > 0);
696         }
697
698         if (nob > iov->iov_len - offset) {
699                 CERROR ("Can't map multiple vaddr fragments\n");
700                 return (-EMSGSIZE);
701         }
702
703         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
704         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
705
706         vvrc = vv_mem_region_register(kibnal_data.kib_hca, vaddr, nob,
707                                       kibnal_data.kib_pd, access,
708                                       &tx->tx_md.md_handle, 
709                                       &tx->tx_md.md_lkey,
710                                       &tx->tx_md.md_rkey);
711         if (vvrc != vv_return_ok) {
712                 CERROR ("Can't map vaddr %p: %d\n", vaddr, vvrc);
713                 return -EFAULT;
714         }
715
716         tx->tx_mapped = KIB_TX_MAPPED;
717
718         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
719         rd->rd_nfrag = 1;
720         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
721         
722         return (0);
723 }
724
725 int
726 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
727                       vv_access_con_bit_mask_t access,
728                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
729 {
730         /* active if I'm sending */
731         int            active = ((access & vv_acc_r_mem_write) == 0);
732         vv_return_t    vvrc;
733         vv_phy_list_t  phys_pages;
734         vv_phy_buf_t  *phys;
735         int            page_offset;
736         int            nphys;
737         int            resid;
738         int            phys_size;
739         int            rc;
740
741         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
742
743         LASSERT (nob > 0);
744         LASSERT (nkiov > 0);
745         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
746         LASSERT ((rd != tx->tx_rd) == !active);
747
748         while (offset >= kiov->kiov_len) {
749                 offset -= kiov->kiov_len;
750                 nkiov--;
751                 kiov++;
752                 LASSERT (nkiov > 0);
753         }
754
755         phys_size = nkiov * sizeof (*phys);
756         PORTAL_ALLOC(phys, phys_size);
757         if (phys == NULL) {
758                 CERROR ("Can't allocate tmp phys\n");
759                 return (-ENOMEM);
760         }
761
762         page_offset = kiov->kiov_offset + offset;
763
764         phys[0].start = kibnal_page2phys(kiov->kiov_page);
765         phys[0].size = PAGE_SIZE;
766
767         nphys = 1;
768         resid = nob - (kiov->kiov_len - offset);
769
770         while (resid > 0) {
771                 kiov++;
772                 nkiov--;
773                 LASSERT (nkiov > 0);
774
775                 if (kiov->kiov_offset != 0 ||
776                     ((resid > PAGE_SIZE) && 
777                      kiov->kiov_len < PAGE_SIZE)) {
778                         int i;
779                         /* Can't have gaps */
780                         CERROR ("Can't make payload contiguous in I/O VM:"
781                                 "page %d, offset %d, len %d \n", nphys, 
782                                 kiov->kiov_offset, kiov->kiov_len);
783
784                         for (i = -nphys; i < nkiov; i++)
785                                 CERROR("kiov[%d] %p +%d for %d\n",
786                                        i, kiov[i].kiov_page, 
787                                        kiov[i].kiov_offset, 
788                                        kiov[i].kiov_len);
789                         
790                         rc = -EINVAL;
791                         goto out;
792                 }
793
794                 LASSERT (nphys * sizeof (*phys) < phys_size);
795                 phys[nphys].start = kibnal_page2phys(kiov->kiov_page);
796                 phys[nphys].size = PAGE_SIZE;
797
798                 nphys++;
799                 resid -= PAGE_SIZE;
800         }
801
802 #if 0
803         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
804         for (i = 0; i < nphys; i++)
805                 CWARN ("   [%d] "LPX64"\n", i, phys[i]);
806 #endif
807
808         vvrc = vv_phy_mem_region_register(kibnal_data.kib_hca,
809                                           &phys_pages,
810                                           IBNAL_RDMA_BASE,
811                                           nphys,
812                                           page_offset,
813                                           kibnal_data.kib_pd,
814                                           access,
815                                           &tx->tx_md.md_handle,
816                                           &tx->tx_md.md_addr,
817                                           &tx->tx_md.md_lkey,
818                                           &tx->tx_md.md_rkey);
819
820         if (vvrc != vv_return_ok) {
821                 CERROR ("Can't map phys: %d\n", vvrc);
822                 rc = -EFAULT;
823                 goto out;
824         }
825
826         CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: "
827                "lkey %x, rkey %x, addr "LPX64"\n",
828                nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey,
829                tx->tx_md.md_addr);
830
831         tx->tx_mapped = KIB_TX_MAPPED;
832         rc = 0;
833
834         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
835         rd->rd_nfrag = 1;
836         kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
837         
838  out:
839         PORTAL_FREE(phys, phys_size);
840         return (rc);
841 }
842 #endif
843
844 kib_conn_t *
845 kibnal_find_conn_locked (kib_peer_t *peer)
846 {
847         struct list_head *tmp;
848
849         /* just return the first connection */
850         list_for_each (tmp, &peer->ibp_conns) {
851                 return (list_entry(tmp, kib_conn_t, ibc_list));
852         }
853
854         return (NULL);
855 }
856
857 void
858 kibnal_check_sends (kib_conn_t *conn)
859 {
860         kib_tx_t       *tx;
861         vv_return_t     vvrc;                        
862         int             rc;
863         int             i;
864         int             done;
865
866         /* Don't send anything until after the connection is established */
867         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
868                 CDEBUG(D_NET, LPX64"too soon\n", conn->ibc_peer->ibp_nid);
869                 return;
870         }
871         
872         spin_lock(&conn->ibc_lock);
873
874         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
875
876         if (list_empty(&conn->ibc_tx_queue) &&
877             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
878                 spin_unlock(&conn->ibc_lock);
879                 
880                 tx = kibnal_get_idle_tx(0);     /* don't block */
881                 if (tx != NULL)
882                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
883
884                 spin_lock(&conn->ibc_lock);
885                 
886                 if (tx != NULL)
887                         kibnal_queue_tx_locked(tx, conn);
888         }
889
890         while (!list_empty (&conn->ibc_tx_queue)) {
891                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
892
893                 /* We rely on this for QP sizing */
894                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
895
896                 LASSERT (conn->ibc_outstanding_credits >= 0);
897                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
898                 LASSERT (conn->ibc_credits >= 0);
899                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
900
901                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE) {
902                         CDEBUG(D_NET, LPX64": posted enough\n",
903                                conn->ibc_peer->ibp_nid);
904                         break;
905                 }
906                 
907                 if (conn->ibc_credits == 0) {   /* no credits */
908                         CDEBUG(D_NET, LPX64": no credits\n",
909                                conn->ibc_peer->ibp_nid);
910                         break;
911                 }
912                 
913                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
914                     conn->ibc_outstanding_credits == 0) { /* giving back credits */
915                         CDEBUG(D_NET, LPX64": not using last credit\n",
916                                conn->ibc_peer->ibp_nid);
917                         break;
918                 }
919                 
920                 list_del (&tx->tx_list);
921
922                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
923                     (!list_empty(&conn->ibc_tx_queue) ||
924                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
925                         /* redundant NOOP */
926                         spin_unlock(&conn->ibc_lock);
927                         kibnal_tx_done(tx);
928                         spin_lock(&conn->ibc_lock);
929                         CDEBUG(D_NET, LPX64": redundant noop\n",
930                                conn->ibc_peer->ibp_nid);
931                         continue;
932                 }
933
934                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
935                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
936
937                 conn->ibc_outstanding_credits = 0;
938                 conn->ibc_nsends_posted++;
939                 conn->ibc_credits--;
940
941                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
942                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
943                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
944                  * and then re-queued here.  It's (just) possible that
945                  * tx_sending is non-zero if we've not done the tx_complete() from
946                  * the first send; hence the += rather than = below. */
947                 tx->tx_sending++;
948
949                 list_add (&tx->tx_list, &conn->ibc_active_txs);
950
951                 /* Keep holding ibc_lock while posting sends on this
952                  * connection; vv_post_send() isn't re-entrant on the same
953                  * QP!! */
954
955                 LASSERT (tx->tx_nwrq > 0);
956
957                 rc = -ECONNABORTED;
958                 vvrc = vv_return_ok;
959                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
960                         tx->tx_status = 0;
961 #if 1
962                         vvrc = vv_post_send_list(kibnal_data.kib_hca,
963                                                  conn->ibc_qp,
964                                                  tx->tx_nwrq,
965                                                  tx->tx_wrq,
966                                                  vv_operation_type_send_rc);
967                         rc = (vvrc == vv_return_ok) ? 0 : -EIO;
968 #else
969                         /* Only post 1 item at a time for now (so we know
970                          * exactly how many got posted successfully) */
971                         for (i = 0; i < tx->tx_nwrq; i++) {
972                                 switch (tx->tx_wrq[i].wr_type) {
973                                 case vv_wr_send:
974                                         CDEBUG(D_NET, "[%d]posting send [%d %x %p]%s: %x\n", 
975                                                i,
976                                                tx->tx_wrq[i].scatgat_list->length,
977                                                tx->tx_wrq[i].scatgat_list->l_key,
978                                                tx->tx_wrq[i].scatgat_list->v_address,
979                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.fance_indicator ?
980                                                "(fence)":"",
981                                                tx->tx_msg->ibm_type);
982                                         break;
983                                 case vv_wr_rdma_write:
984                                         CDEBUG(D_NET, "[%d]posting PUT  [%d %x %p]->[%x "LPX64"]\n", 
985                                                i,
986                                                tx->tx_wrq[i].scatgat_list->length,
987                                                tx->tx_wrq[i].scatgat_list->l_key,
988                                                tx->tx_wrq[i].scatgat_list->v_address,
989                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_r_key,
990                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_addr);
991                                         break;
992                                 case vv_wr_rdma_read:
993                                         CDEBUG(D_NET, "[%d]posting GET  [%d %x %p]->[%x "LPX64"]\n", 
994                                                i,
995                                                tx->tx_wrq[i].scatgat_list->length,
996                                                tx->tx_wrq[i].scatgat_list->l_key,
997                                                tx->tx_wrq[i].scatgat_list->v_address,
998                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_r_key,
999                                                tx->tx_wrq[i].type.send.send_qp_type.rc_type.r_addr);
1000                                         break;
1001                                 default:
1002                                         LBUG();
1003                                 }
1004                                 vvrc = vv_post_send(kibnal_data.kib_hca,
1005                                                     conn->ibc_qp, 
1006                                                     &tx->tx_wrq[i], 
1007                                                     vv_operation_type_send_rc);
1008                                 CDEBUG(D_NET, LPX64": post %d/%d\n",
1009                                        conn->ibc_peer->ibp_nid, i, tx->tx_nwrq);
1010                                 if (vvrc != vv_return_ok) {
1011                                         rc = -EIO;
1012                                         break;
1013                                 }
1014                         }
1015 #endif
1016                 }
1017
1018                 if (rc != 0) {
1019                         /* NB credits are transferred in the actual
1020                          * message, which can only be the last work item */
1021                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
1022                         conn->ibc_credits++;
1023                         conn->ibc_nsends_posted--;
1024
1025                         tx->tx_status = rc;
1026                         tx->tx_waiting = 0;
1027                         tx->tx_sending--;
1028                         
1029                         done = (tx->tx_sending == 0);
1030                         if (done)
1031                                 list_del (&tx->tx_list);
1032                         
1033                         spin_unlock(&conn->ibc_lock);
1034                         
1035                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1036                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
1037                                         vvrc, conn->ibc_peer->ibp_nid);
1038                         else
1039                                 CDEBUG (D_NET, "Error %d posting transmit to "
1040                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1041
1042                         kibnal_close_conn (conn, rc);
1043
1044                         if (done)
1045                                 kibnal_tx_done (tx);
1046                         return;
1047                 }
1048         }
1049
1050         spin_unlock(&conn->ibc_lock);
1051 }
1052
1053 void
1054 kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
1055 {
1056         kib_conn_t   *conn = tx->tx_conn;
1057         int           failed = (vvrc != vv_comp_status_success);
1058         int           idle;
1059
1060         CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n", 
1061                tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);
1062
1063         LASSERT (tx->tx_sending != 0);
1064
1065         if (failed &&
1066             tx->tx_status == 0 &&
1067             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1068                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
1069                         conn->ibc_peer->ibp_nid, vvrc);
1070
1071         spin_lock(&conn->ibc_lock);
1072
1073         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1074          * gets to free it, which also drops its ref on 'conn'. */
1075
1076         tx->tx_sending--;
1077
1078         if (failed) {
1079                 tx->tx_waiting = 0;
1080                 tx->tx_status = -EIO;
1081         }
1082         
1083         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1084                !tx->tx_waiting;                 /* Not waiting for peer */
1085         if (idle)
1086                 list_del(&tx->tx_list);
1087
1088         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1089
1090         if (tx->tx_sending == 0)
1091                 conn->ibc_nsends_posted--;
1092
1093         spin_unlock(&conn->ibc_lock);
1094
1095         if (idle)
1096                 kibnal_tx_done (tx);
1097
1098         if (failed)
1099                 kibnal_close_conn (conn, -EIO);
1100         else
1101                 kibnal_check_sends(conn);
1102
1103         kibnal_conn_decref(conn);               /* ...until here */
1104 }
1105
1106 void
1107 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1108 {
1109         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];
1110         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];
1111         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1112
1113         LASSERT (tx->tx_nwrq >= 0 && 
1114                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1115         LASSERT (nob <= IBNAL_MSG_SIZE);
1116
1117         kibnal_init_msg(tx->tx_msg, type, body_nob);
1118
1119         *gl = (vv_scatgat_t) {
1120                 .v_address = (void *)((unsigned long)KIBNAL_TX_VADDR(tx)),
1121                 .l_key     = KIBNAL_TX_LKEY(tx),
1122                 .length    = nob,
1123         };
1124
1125         memset(wrq, 0, sizeof(*wrq));
1126
1127         wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1128         wrq->wr_type = vv_wr_send;
1129         wrq->scatgat_list = gl;
1130         wrq->num_of_data_segments = 1;
1131         wrq->completion_notification = 1;
1132         wrq->type.send.solicited_event = 1;
1133         wrq->type.send.immidiate_data_indicator = 0;
1134         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1135         
1136         tx->tx_nwrq++;
1137 }
1138
1139 int
1140 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1141                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1142 {
1143         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1144         int              resid = nob;
1145         kib_msg_t       *ibmsg = tx->tx_msg;
1146         kib_rdma_desc_t *srcrd = tx->tx_rd;
1147         kib_rdma_frag_t *srcfrag;
1148         int              srcidx;
1149         kib_rdma_frag_t *dstfrag;
1150         int              dstidx;
1151         vv_scatgat_t    *gl;
1152         vv_wr_t         *wrq;
1153         int              wrknob;
1154         int              rc;
1155
1156         /* Called by scheduler */
1157         LASSERT (!in_interrupt());
1158
1159         LASSERT (type == IBNAL_MSG_GET_DONE ||
1160                  type == IBNAL_MSG_PUT_DONE);
1161
1162         srcidx = dstidx = 0;
1163         srcfrag = &srcrd->rd_frags[0];
1164         dstfrag = &dstrd->rd_frags[0];
1165         rc = resid;
1166
1167         while (resid > 0) {
1168                 if (srcidx >= srcrd->rd_nfrag) {
1169                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1170                         rc = -EPROTO;
1171                         break;
1172                 }
1173                 
1174                 if (dstidx == dstrd->rd_nfrag) {
1175                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1176                         rc = -EPROTO;
1177                         break;
1178                 }
1179
1180                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1181                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1182                                srcidx, srcrd->rd_nfrag,
1183                                dstidx, dstrd->rd_nfrag);
1184                         rc = -EMSGSIZE;
1185                         break;
1186                 }
1187
1188                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1189
1190                 gl = &tx->tx_gl[tx->tx_nwrq];
1191                 gl->v_address = (void *)((unsigned long)kibnal_rf_addr(srcfrag));
1192                 gl->length    = wrknob;
1193                 gl->l_key     = srcrd->rd_key;
1194
1195                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1196
1197                 wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1198                 wrq->completion_notification = 0;
1199                 wrq->scatgat_list = gl;
1200                 wrq->num_of_data_segments = 1;
1201                 wrq->wr_type = vv_wr_rdma_write;
1202                 wrq->type.send.solicited_event = 0;
1203                 wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1204                 wrq->type.send.send_qp_type.rc_type.r_addr = kibnal_rf_addr(dstfrag);
1205                 wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1206
1207                 resid -= wrknob;
1208                 if (wrknob < srcfrag->rf_nob) {
1209                         kibnal_rf_set(srcfrag, 
1210                                       kibnal_rf_addr(srcfrag) + resid, 
1211                                       srcfrag->rf_nob - wrknob);
1212                 } else {
1213                         srcfrag++;
1214                         srcidx++;
1215                 }
1216                 
1217                 if (wrknob < dstfrag->rf_nob) {
1218                         kibnal_rf_set(dstfrag,
1219                                       kibnal_rf_addr(dstfrag) + resid,
1220                                       dstfrag->rf_nob - wrknob);
1221                 } else {
1222                         dstfrag++;
1223                         dstidx++;
1224                 }
1225                 
1226                 tx->tx_nwrq++;
1227         }
1228
1229         if (rc < 0)                             /* no RDMA if completing with failure */
1230                 tx->tx_nwrq = 0;
1231         
1232         ibmsg->ibm_u.completion.ibcm_status = rc;
1233         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1234         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1235
1236         return rc;
1237 }
1238
1239 void
1240 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1241 {
1242         spin_lock(&conn->ibc_lock);
1243         kibnal_queue_tx_locked (tx, conn);
1244         spin_unlock(&conn->ibc_lock);
1245         
1246         kibnal_check_sends(conn);
1247 }
1248
1249 void
1250 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1251 {
1252         kib_peer_t      *peer;
1253         kib_conn_t      *conn;
1254         unsigned long    flags;
1255         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1256
1257         /* If I get here, I've committed to send, so I complete the tx with
1258          * failure on any problems */
1259         
1260         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1261         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1262
1263         read_lock_irqsave(g_lock, flags);
1264         
1265         peer = kibnal_find_peer_locked (nid);
1266         if (peer == NULL) {
1267                 read_unlock_irqrestore(g_lock, flags);
1268                 tx->tx_status = -EHOSTUNREACH;
1269                 kibnal_tx_done (tx);
1270                 return;
1271         }
1272
1273         conn = kibnal_find_conn_locked (peer);
1274         if (conn != NULL) {
1275                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1276                 read_unlock_irqrestore(g_lock, flags);
1277                 
1278                 kibnal_queue_tx (tx, conn);
1279                 kibnal_conn_decref(conn);       /* ...to here */
1280                 return;
1281         }
1282         
1283         /* Making one or more connections; I'll need a write lock... */
1284         read_unlock(g_lock);
1285         write_lock(g_lock);
1286
1287         peer = kibnal_find_peer_locked (nid);
1288         if (peer == NULL) {
1289                 write_unlock_irqrestore(g_lock, flags);
1290                 tx->tx_status = -EHOSTUNREACH;
1291                 kibnal_tx_done (tx);
1292                 return;
1293         }
1294
1295         conn = kibnal_find_conn_locked (peer);
1296         if (conn != NULL) {
1297                 /* Connection exists; queue message on it */
1298                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1299                 write_unlock_irqrestore(g_lock, flags);
1300                 
1301                 kibnal_queue_tx (tx, conn);
1302                 kibnal_conn_decref(conn);       /* ...until here */
1303                 return;
1304         }
1305
1306         if (peer->ibp_connecting == 0) {
1307                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1308                         write_unlock_irqrestore(g_lock, flags);
1309                         tx->tx_status = -EHOSTUNREACH;
1310                         kibnal_tx_done (tx);
1311                         return;
1312                 }
1313         
1314                 peer->ibp_connecting = 1;
1315                 kibnal_peer_addref(peer); /* extra ref for connd */
1316         
1317                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1318         
1319                 list_add_tail (&peer->ibp_connd_list,
1320                                &kibnal_data.kib_connd_peers);
1321                 wake_up (&kibnal_data.kib_connd_waitq);
1322         
1323                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1324         }
1325         
1326         /* A connection is being established; queue the message... */
1327         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1328
1329         write_unlock_irqrestore(g_lock, flags);
1330 }
1331
1332 int
1333 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
1334 {
1335         /* I would guess that if kibnal_get_peer (nid) == NULL,
1336            and we're not routing, then 'nid' is very distant :) */
1337         if ( nal->libnal_ni.ni_pid.nid == nid ) {
1338                 *dist = 0;
1339         } else {
1340                 *dist = 1;
1341         }
1342
1343         return 0;
1344 }
1345
1346 ptl_err_t
1347 kibnal_sendmsg(lib_nal_t    *nal, 
1348                void         *private,
1349                lib_msg_t    *libmsg,
1350                ptl_hdr_t    *hdr, 
1351                int           type, 
1352                ptl_nid_t     nid, 
1353                ptl_pid_t     pid,
1354                unsigned int  payload_niov, 
1355                struct iovec *payload_iov, 
1356                ptl_kiov_t   *payload_kiov,
1357                int           payload_offset,
1358                int           payload_nob)
1359 {
1360         kib_msg_t  *ibmsg;
1361         kib_tx_t   *tx;
1362         int         nob;
1363         int         rc;
1364         int         n;
1365
1366         /* NB 'private' is different depending on what we're sending.... */
1367
1368         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64
1369                " pid %d\n", payload_nob, payload_niov, nid , pid);
1370
1371         LASSERT (payload_nob == 0 || payload_niov > 0);
1372         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1373
1374         /* Thread context */
1375         LASSERT (!in_interrupt());
1376         /* payload is either all vaddrs or all pages */
1377         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1378
1379         switch (type) {
1380         default:
1381                 LBUG();
1382                 return (PTL_FAIL);
1383                 
1384         case PTL_MSG_REPLY: {
1385                 /* reply's 'private' is the incoming receive */
1386                 kib_rx_t *rx = private;
1387
1388                 LASSERT(rx != NULL);
1389
1390                 if (rx->rx_msg->ibm_type == IBNAL_MSG_IMMEDIATE) {
1391                         /* RDMA not expected */
1392                         nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1393                         if (nob > IBNAL_MSG_SIZE) {
1394                                 CERROR("REPLY for "LPX64" too big (RDMA not requested):"
1395                                        "%d (max for message is %d)\n", 
1396                                        nid, payload_nob, IBNAL_MSG_SIZE);
1397                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
1398                                        nob, nid);
1399                                 return PTL_FAIL;
1400                         }
1401                         break;
1402                 }
1403
1404                 /* Incoming message consistent with RDMA? */
1405                 if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
1406                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
1407                                nid, rx->rx_msg->ibm_type);
1408                         return PTL_FAIL;
1409                 }
1410
1411                 /* NB rx_complete() will send GET_NAK when I return to it from
1412                  * here, unless I set rx_responded! */
1413
1414                 tx = kibnal_get_idle_tx(0);
1415                 if (tx == NULL) {
1416                         CERROR("Can't get tx for REPLY to "LPX64"\n", nid);
1417                         return PTL_FAIL;
1418                 }
1419
1420                 if (payload_nob == 0)
1421                         rc = 0;
1422                 else if (payload_kiov == NULL)
1423                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
1424                                                  payload_niov, payload_iov, 
1425                                                  payload_offset, payload_nob);
1426                 else
1427                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1428                                                   payload_niov, payload_kiov,
1429                                                   payload_offset, payload_nob);
1430                 if (rc != 0) {
1431                         CERROR("Can't setup GET src for "LPX64": %d\n", nid, rc);
1432                         kibnal_tx_done(tx);
1433                         return PTL_FAIL;
1434                 }
1435                 
1436                 rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, payload_nob,
1437                                       &rx->rx_msg->ibm_u.get.ibgm_rd,
1438                                       rx->rx_msg->ibm_u.get.ibgm_cookie);
1439                 if (rc < 0) {
1440                         CERROR("Can't setup rdma for GET from "LPX64": %d\n", 
1441                                nid, rc);
1442                 } else if (rc == 0) {
1443                         /* No RDMA: local completion may happen now! */
1444                         lib_finalize (&kibnal_lib, NULL, libmsg, PTL_OK);
1445                 } else {
1446                         /* RDMA: lib_finalize(libmsg) when it completes */
1447                         tx->tx_libmsg[0] = libmsg;
1448                 }
1449
1450                 kibnal_queue_tx(tx, rx->rx_conn);
1451                 rx->rx_responded = 1;
1452                 return (rc >= 0) ? PTL_OK : PTL_FAIL;
1453         }
1454
1455         case PTL_MSG_GET:
1456                 /* will the REPLY message be small enough not to need RDMA? */
1457                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1458                 if (nob <= IBNAL_MSG_SIZE)
1459                         break;
1460
1461                 tx = kibnal_get_idle_tx(1);     /* may block; caller is an app thread */
1462                 LASSERT (tx != NULL);
1463
1464                 ibmsg = tx->tx_msg;
1465                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1466                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1467
1468                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
1469                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1470                                                  vv_acc_r_mem_write,
1471                                                  libmsg->md->md_niov,
1472                                                  libmsg->md->md_iov.iov,
1473                                                  0, libmsg->md->length);
1474                 else
1475                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1476                                                   vv_acc_r_mem_write,
1477                                                   libmsg->md->md_niov,
1478                                                   libmsg->md->md_iov.kiov,
1479                                                   0, libmsg->md->length);
1480                 if (rc != 0) {
1481                         CERROR("Can't setup GET sink for "LPX64": %d\n", nid, rc);
1482                         kibnal_tx_done(tx);
1483                         return PTL_FAIL;
1484                 }
1485
1486                 n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1487                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1488                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1489
1490                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, nid, libmsg);
1491                 if (tx->tx_libmsg[1] == NULL) {
1492                         CERROR("Can't create reply for GET -> "LPX64"\n", nid);
1493                         kibnal_tx_done(tx);
1494                         return PTL_FAIL;
1495                 }
1496
1497                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg[0,1] on completion */
1498                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1499                 kibnal_launch_tx(tx, nid);
1500                 return PTL_OK;
1501
1502         case PTL_MSG_ACK:
1503                 LASSERT (payload_nob == 0);
1504                 break;
1505
1506         case PTL_MSG_PUT:
1507                 /* Is the payload small enough not to need RDMA? */
1508                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1509                 if (nob <= IBNAL_MSG_SIZE)
1510                         break;
1511
1512                 tx = kibnal_get_idle_tx(1);     /* may block: caller is app thread */
1513                 LASSERT (tx != NULL);
1514
1515                 if (payload_kiov == NULL)
1516                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
1517                                                  payload_niov, payload_iov,
1518                                                  payload_offset, payload_nob);
1519                 else
1520                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1521                                                   payload_niov, payload_kiov,
1522                                                   payload_offset, payload_nob);
1523                 if (rc != 0) {
1524                         CERROR("Can't setup PUT src for "LPX64": %d\n", nid, rc);
1525                         kibnal_tx_done(tx);
1526                         return PTL_FAIL;
1527                 }
1528
1529                 ibmsg = tx->tx_msg;
1530                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1531                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1532                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1533
1534                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1535                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1536                 kibnal_launch_tx(tx, nid);
1537                 return PTL_OK;
1538         }
1539
1540         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1541                  <= IBNAL_MSG_SIZE);
1542
1543         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1544                                   type == PTL_MSG_REPLY));
1545         if (tx == NULL) {
1546                 CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", type, nid);
1547                 return PTL_NO_SPACE;
1548         }
1549
1550         ibmsg = tx->tx_msg;
1551         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1552
1553         if (payload_nob > 0) {
1554                 if (payload_kiov != NULL)
1555                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1556                                           payload_niov, payload_kiov,
1557                                           payload_offset, payload_nob);
1558                 else
1559                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1560                                          payload_niov, payload_iov,
1561                                          payload_offset, payload_nob);
1562         }
1563
1564         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1565         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1566
1567         tx->tx_libmsg[0] = libmsg;              /* finalise libmsg on completion */
1568         kibnal_launch_tx(tx, nid);
1569         return PTL_OK;
1570 }
1571
1572 ptl_err_t
1573 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1574                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1575                unsigned int payload_niov, struct iovec *payload_iov,
1576                size_t payload_offset, size_t payload_len)
1577 {
1578         CDEBUG(D_NET, "  pid = %d, nid="LPU64"\n",
1579                pid, nid);
1580         return (kibnal_sendmsg(nal, private, cookie,
1581                                hdr, type, nid, pid,
1582                                payload_niov, payload_iov, NULL,
1583                                payload_offset, payload_len));
1584 }
1585
1586 ptl_err_t
1587 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1588                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1589                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1590                      size_t payload_offset, size_t payload_len)
1591 {
1592         return (kibnal_sendmsg(nal, private, cookie,
1593                                hdr, type, nid, pid,
1594                                payload_niov, NULL, payload_kiov,
1595                                payload_offset, payload_len));
1596 }
1597
1598 ptl_err_t
1599 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1600                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1601                  size_t offset, int mlen, int rlen)
1602 {
1603         kib_rx_t    *rx = private;
1604         kib_msg_t   *rxmsg = rx->rx_msg;
1605         kib_conn_t  *conn = rx->rx_conn;
1606         kib_tx_t    *tx;
1607         kib_msg_t   *txmsg;
1608         int          nob;
1609         int          rc;
1610         int          n;
1611         
1612         LASSERT (mlen <= rlen);
1613         LASSERT (mlen >= 0);
1614         LASSERT (!in_interrupt());
1615         /* Either all pages or all vaddrs */
1616         LASSERT (!(kiov != NULL && iov != NULL));
1617
1618         switch (rxmsg->ibm_type) {
1619         default:
1620                 LBUG();
1621                 
1622         case IBNAL_MSG_IMMEDIATE:
1623                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1624                 if (nob > IBNAL_MSG_SIZE) {
1625                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1626                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1627                         return (PTL_FAIL);
1628                 }
1629
1630                 if (kiov != NULL)
1631                         lib_copy_buf2kiov(niov, kiov, offset,
1632                                           rxmsg->ibm_u.immediate.ibim_payload,
1633                                           mlen);
1634                 else
1635                         lib_copy_buf2iov(niov, iov, offset,
1636                                          rxmsg->ibm_u.immediate.ibim_payload,
1637                                          mlen);
1638
1639                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1640                 return (PTL_OK);
1641
1642         case IBNAL_MSG_PUT_REQ:
1643                 /* NB rx_complete() will send PUT_NAK when I return to it from
1644                  * here, unless I set rx_responded!  */
1645
1646                 if (mlen == 0) { /* No payload to RDMA */
1647                         lib_finalize(nal, NULL, libmsg, PTL_OK);
1648                         return PTL_OK;
1649                 }
1650
1651                 tx = kibnal_get_idle_tx(0);
1652                 if (tx == NULL) {
1653                         CERROR("Can't allocate tx for "LPX64"\n",
1654                                conn->ibc_peer->ibp_nid);
1655                         return PTL_FAIL;
1656                 }
1657
1658                 txmsg = tx->tx_msg;
1659                 if (kiov == NULL)
1660                         rc = kibnal_setup_rd_iov(tx, 
1661                                                  &txmsg->ibm_u.putack.ibpam_rd,
1662                                                  vv_acc_r_mem_write,
1663                                                  niov, iov, offset, mlen);
1664                 else
1665                         rc = kibnal_setup_rd_kiov(tx,
1666                                                   &txmsg->ibm_u.putack.ibpam_rd,
1667                                                   vv_acc_r_mem_write,
1668                                                   niov, kiov, offset, mlen);
1669                 if (rc != 0) {
1670                         CERROR("Can't setup PUT sink for "LPX64": %d\n",
1671                                conn->ibc_peer->ibp_nid, rc);
1672                         kibnal_tx_done(tx);
1673                         return PTL_FAIL;
1674                 }
1675
1676                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1677                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1678
1679                 n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1680                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1681                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1682
1683                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1684                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1685                 kibnal_queue_tx(tx, conn);
1686
1687                 LASSERT (!rx->rx_responded);
1688                 rx->rx_responded = 1;
1689                 return PTL_OK;
1690
1691         case IBNAL_MSG_GET_REQ:
1692                 /* We get called here just to discard any junk after the
1693                  * GET hdr. */
1694                 LASSERT (libmsg == NULL);
1695                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1696                 return (PTL_OK);
1697         }
1698 }
1699
1700 ptl_err_t
1701 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1702               unsigned int niov, struct iovec *iov, 
1703               size_t offset, size_t mlen, size_t rlen)
1704 {
1705         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1706                                 offset, mlen, rlen));
1707 }
1708
1709 ptl_err_t
1710 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1711                      unsigned int niov, ptl_kiov_t *kiov, 
1712                      size_t offset, size_t mlen, size_t rlen)
1713 {
1714         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1715                                 offset, mlen, rlen));
1716 }
1717
1718 int
1719 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1720 {
1721         long    pid = kernel_thread (fn, arg, 0);
1722
1723         if (pid < 0)
1724                 return ((int)pid);
1725
1726         atomic_inc (&kibnal_data.kib_nthreads);
1727         return (0);
1728 }
1729
1730 void
1731 kibnal_thread_fini (void)
1732 {
1733         atomic_dec (&kibnal_data.kib_nthreads);
1734 }
1735
1736 void
1737 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1738 {
1739         /* This just does the immmediate housekeeping.  'error' is zero for a
1740          * normal shutdown which can happen only after the connection has been
1741          * established.  If the connection is established, schedule the
1742          * connection to be finished off by the connd.  Otherwise the connd is
1743          * already dealing with it (either to set it up or tear it down).
1744          * Caller holds kib_global_lock exclusively in irq context */
1745         kib_peer_t   *peer = conn->ibc_peer;
1746
1747         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1748
1749         if (error != 0 && conn->ibc_comms_error == 0)
1750                 conn->ibc_comms_error = error;
1751
1752         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1753                 return; /* already being handled  */
1754
1755         CDEBUG (error == 0 ? D_NET : D_ERROR,
1756                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1757
1758         /* connd takes ibc_list's ref */
1759         list_del (&conn->ibc_list);
1760         
1761         if (list_empty (&peer->ibp_conns) &&
1762             peer->ibp_persistence == 0) {
1763                 /* Non-persistent peer with no more conns... */
1764                 kibnal_unlink_peer_locked (peer);
1765         }
1766
1767         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECT1);
1768
1769         spin_lock(&kibnal_data.kib_connd_lock);
1770
1771         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1772         wake_up (&kibnal_data.kib_connd_waitq);
1773                 
1774         spin_unlock(&kibnal_data.kib_connd_lock);
1775 }
1776
1777 void
1778 kibnal_close_conn (kib_conn_t *conn, int error)
1779 {
1780         unsigned long flags;
1781         
1782         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1783
1784         kibnal_close_conn_locked (conn, error);
1785         
1786         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1787 }
1788
1789 void
1790 kibnal_handle_early_rxs(kib_conn_t *conn)
1791 {
1792         unsigned long    flags;
1793         kib_rx_t        *rx;
1794
1795         LASSERT (!in_interrupt());
1796         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1797         
1798         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1799         while (!list_empty(&conn->ibc_early_rxs)) {
1800                 rx = list_entry(conn->ibc_early_rxs.next,
1801                                 kib_rx_t, rx_list);
1802                 list_del(&rx->rx_list);
1803                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1804                 
1805                 kibnal_handle_rx(rx);
1806                 
1807                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1808         }
1809         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1810 }
1811
1812 void
1813 kibnal_conn_disconnected(kib_conn_t *conn)
1814 {
1815         LIST_HEAD        (zombies); 
1816         struct list_head *tmp;
1817         struct list_head *nxt;
1818         kib_tx_t         *tx;
1819
1820         /* I'm the connd */
1821         LASSERT (!in_interrupt());
1822         LASSERT (current == kibnal_data.kib_connd);
1823         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
1824         
1825         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
1826
1827         /* move QP to error state to make posted work items complete */
1828         kibnal_set_qp_state(conn, vv_qp_state_error);
1829
1830         spin_lock(&conn->ibc_lock);
1831
1832         /* Complete all tx descs not waiting for sends to complete.
1833          * NB we should be safe from RDMA now that the QP has changed state */
1834
1835         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1836                 tx = list_entry (tmp, kib_tx_t, tx_list);
1837
1838                 tx->tx_status = -ECONNABORTED;
1839                 tx->tx_waiting = 0;
1840                 
1841                 if (tx->tx_sending != 0)
1842                         continue;
1843
1844                 list_del (&tx->tx_list);
1845                 list_add (&tx->tx_list, &zombies);
1846         }
1847
1848         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1849                 tx = list_entry (tmp, kib_tx_t, tx_list);
1850
1851                 LASSERT (tx->tx_waiting ||
1852                          tx->tx_sending != 0);
1853
1854                 tx->tx_status = -ECONNABORTED;
1855                 tx->tx_waiting = 0;
1856                 
1857                 if (tx->tx_sending != 0)
1858                         continue;
1859
1860                 list_del (&tx->tx_list);
1861                 list_add (&tx->tx_list, &zombies);
1862         }
1863         
1864         spin_unlock(&conn->ibc_lock);
1865
1866         while (!list_empty(&zombies)) {
1867                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1868
1869                 list_del(&tx->tx_list);
1870                 kibnal_tx_done (tx);
1871         }
1872
1873         kibnal_handle_early_rxs(conn);
1874 }
1875
1876 void
1877 kibnal_peer_connect_failed (kib_peer_t *peer, int active)
1878 {
1879         struct list_head  zombies;
1880         kib_tx_t         *tx;
1881         unsigned long     flags;
1882
1883         /* Only the connd creates conns => single threaded */
1884         LASSERT (!in_interrupt());
1885         LASSERT (current == kibnal_data.kib_connd);
1886         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1887
1888         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1889
1890         if (active) {
1891                 LASSERT (peer->ibp_connecting != 0);
1892                 peer->ibp_connecting--;
1893         } else {
1894                 LASSERT (!kibnal_peer_active(peer));
1895         }
1896         
1897         if (peer->ibp_connecting != 0) {
1898                 /* another connection attempt under way (loopback?)... */
1899                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1900                 return;
1901         }
1902
1903         if (list_empty(&peer->ibp_conns)) {
1904                 /* Say when active connection can be re-attempted */
1905                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1906                 /* Increase reconnection interval */
1907                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1908                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1909         
1910                 /* Take peer's blocked transmits to complete with error */
1911                 list_add(&zombies, &peer->ibp_tx_queue);
1912                 list_del_init(&peer->ibp_tx_queue);
1913                 
1914                 if (kibnal_peer_active(peer) &&
1915                     (peer->ibp_persistence == 0)) {
1916                         /* failed connection attempt on non-persistent peer */
1917                         kibnal_unlink_peer_locked (peer);
1918                 }
1919         } else {
1920                 /* Can't have blocked transmits if there are connections */
1921                 LASSERT (list_empty(&peer->ibp_tx_queue));
1922         }
1923         
1924         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1925
1926         if (list_empty (&zombies)) 
1927                 return;
1928         
1929         CERROR ("Deleting messages for "LPX64": connection failed\n", peer->ibp_nid);
1930         do {
1931                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1932
1933                 list_del (&tx->tx_list);
1934                 /* complete now */
1935                 tx->tx_status = -EHOSTUNREACH;
1936                 kibnal_tx_done (tx);
1937         } while (!list_empty (&zombies));
1938 }
1939
1940 void
1941 kibnal_connreq_done(kib_conn_t *conn, int active, int status)
1942 {
1943         static cm_reject_data_t   rej;
1944
1945         struct list_head   txs;
1946         kib_peer_t        *peer = conn->ibc_peer;
1947         kib_peer_t        *peer2;
1948         unsigned long      flags;
1949         kib_tx_t          *tx;
1950
1951         /* Only the connd creates conns => single threaded */
1952         LASSERT (!in_interrupt());
1953         LASSERT (current == kibnal_data.kib_connd);
1954         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
1955
1956         if (active) {
1957                 LASSERT (peer->ibp_connecting > 0);
1958         } else {
1959                 LASSERT (!kibnal_peer_active(peer));
1960         }
1961         
1962         PORTAL_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1963         conn->ibc_connvars = NULL;
1964
1965         if (status != 0) {
1966                 /* failed to establish connection */
1967                 switch (conn->ibc_state) {
1968                 default:
1969                         LBUG();
1970                 case IBNAL_CONN_ACTIVE_CHECK_REPLY:
1971                         /* got a connection reply but failed checks */
1972                         LASSERT (active);
1973                         memset(&rej, 0, sizeof(rej));
1974                         rej.reason = cm_rej_code_usr_rej;
1975                         cm_reject(conn->ibc_cep, &rej);
1976                         break;
1977
1978                 case IBNAL_CONN_ACTIVE_CONNECT:
1979                         LASSERT (active);
1980                         cm_cancel(conn->ibc_cep);
1981                         kibnal_pause(HZ/10);
1982                         /* cm_connect() failed immediately or
1983                          * callback returned failure */
1984                         break;
1985
1986                 case IBNAL_CONN_ACTIVE_ARP:
1987                         LASSERT (active);
1988                         /* ibat_get_ib_data() failed immediately 
1989                          * or callback returned failure */
1990                         break;
1991
1992                 case IBNAL_CONN_INIT:
1993                         break;
1994
1995                 case IBNAL_CONN_PASSIVE_WAIT:
1996                         LASSERT (!active);
1997                         /* cm_accept callback returned failure */
1998                         break;
1999                 }
2000
2001                 kibnal_peer_connect_failed(conn->ibc_peer, active);
2002                 kibnal_conn_disconnected(conn);
2003                 return;
2004         }
2005
2006         /* connection established */
2007         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2008
2009         if (active) {
2010                 LASSERT(conn->ibc_state == IBNAL_CONN_ACTIVE_RTU);
2011         } else {
2012                 LASSERT(conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2013         }
2014         
2015         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2016
2017         if (!active) {
2018                 peer2 = kibnal_find_peer_locked(peer->ibp_nid);
2019                 if (peer2 != NULL) {
2020                         /* already in the peer table; swap */
2021                         conn->ibc_peer = peer2;
2022                         kibnal_peer_addref(peer2);
2023                         kibnal_peer_decref(peer);
2024                         peer = conn->ibc_peer;
2025                 } else {
2026                         /* add 'peer' to the peer table */
2027                         kibnal_peer_addref(peer);
2028                         list_add_tail(&peer->ibp_list,
2029                                       kibnal_nid2peerlist(peer->ibp_nid));
2030                 }
2031         }
2032         
2033         /* Add conn to peer's list and nuke any dangling conns from a different
2034          * peer instance... */
2035         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2036         list_add(&conn->ibc_list, &peer->ibp_conns);
2037         kibnal_close_stale_conns_locked (conn->ibc_peer,
2038                                          conn->ibc_incarnation);
2039
2040         if (!kibnal_peer_active(peer) ||        /* peer has been deleted */
2041             conn->ibc_comms_error != 0 ||       /* comms error */
2042             conn->ibc_disconnect) {             /* need to disconnect */
2043                 
2044                 /* start to shut down connection */
2045                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2046
2047                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2048                 kibnal_peer_connect_failed(peer, active);
2049                 return;
2050         }
2051
2052         if (active)
2053                 peer->ibp_connecting--;
2054
2055         /* grab pending txs while I have the lock */
2056         list_add(&txs, &peer->ibp_tx_queue);
2057         list_del_init(&peer->ibp_tx_queue);
2058         
2059         /* reset reconnect interval for next attempt */
2060         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
2061         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2062
2063         /* Schedule blocked txs */
2064         spin_lock (&conn->ibc_lock);
2065         while (!list_empty (&txs)) {
2066                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2067                 list_del (&tx->tx_list);
2068
2069                 kibnal_queue_tx_locked (tx, conn);
2070         }
2071         spin_unlock (&conn->ibc_lock);
2072         kibnal_check_sends (conn);
2073
2074         /* schedule blocked rxs */
2075         kibnal_handle_early_rxs(conn);
2076 }
2077
2078 void
2079 kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
2080 {
2081         static cm_dreply_data_t drep;           /* just zeroed space */
2082         
2083         kib_conn_t             *conn = (kib_conn_t *)arg;
2084         unsigned long           flags;
2085         
2086         /* CAVEAT EMPTOR: tasklet context */
2087
2088         switch (cmdata->status) {
2089         default:
2090                 LBUG();
2091                 
2092         case cm_event_disconn_request:
2093                 /* IBNAL_CONN_ACTIVE_RTU:  gets closed in kibnal_connreq_done
2094                  * IBNAL_CONN_ESTABLISHED: I start it closing
2095                  * otherwise:              it's closing anyway */
2096                 cm_disconnect(conn->ibc_cep, NULL, &drep);
2097                 cm_cancel(conn->ibc_cep);
2098
2099                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2100                 LASSERT (!conn->ibc_disconnect);
2101                 conn->ibc_disconnect = 1;
2102
2103                 switch (conn->ibc_state) {
2104                 default:
2105                         LBUG();
2106
2107                 case IBNAL_CONN_ACTIVE_RTU:
2108                         /* kibnal_connreq_done is getting there; It'll see
2109                          * ibc_disconnect set... */
2110                         kibnal_conn_decref(conn); /* lose my ref */
2111                         break;
2112
2113                 case IBNAL_CONN_ESTABLISHED:
2114                         /* kibnal_connreq_done got there already; get
2115                          * disconnect going... */
2116                         kibnal_close_conn_locked(conn, 0);
2117                         kibnal_conn_decref(conn); /* lose my ref */
2118                         break;
2119
2120                 case IBNAL_CONN_DISCONNECT1:
2121                         /* kibnal_terminate_conn is getting there; It'll see
2122                          * ibc_disconnect set... */
2123                         kibnal_conn_decref(conn); /* lose my ref */
2124                         break;
2125
2126                 case IBNAL_CONN_DISCONNECT2:
2127                         /* kibnal_terminate_conn got there already; complete
2128                          * the disconnect.  NB kib_connd_conns takes my ref */
2129                         spin_lock(&kibnal_data.kib_connd_lock);
2130                         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2131                         wake_up(&kibnal_data.kib_connd_waitq);
2132                         spin_unlock(&kibnal_data.kib_connd_lock);
2133                         break;
2134                 }
2135                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2136                 return;
2137                 
2138         case cm_event_disconn_timeout:
2139         case cm_event_disconn_reply:
2140                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2141                 LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT2);
2142                 LASSERT (!conn->ibc_disconnect);
2143                 conn->ibc_disconnect = 1;
2144
2145                 /* kibnal_terminate_conn sent the disconnect request.  
2146                  * NB kib_connd_conns takes my ref */
2147                 spin_lock(&kibnal_data.kib_connd_lock);
2148                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2149                 wake_up(&kibnal_data.kib_connd_waitq);
2150                 spin_unlock(&kibnal_data.kib_connd_lock);
2151
2152                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2153                 break;
2154                 
2155         case cm_event_connected:
2156         case cm_event_conn_timeout:
2157         case cm_event_conn_reject:
2158                 LASSERT (conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2159                 conn->ibc_connvars->cv_conndata = *cmdata;
2160                 
2161                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2162                 list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2163                 wake_up(&kibnal_data.kib_connd_waitq);
2164                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2165                 break;
2166         }
2167 }
2168
2169 void
2170 kibnal_check_passive_wait(kib_conn_t *conn)
2171 {
2172         int     rc;
2173
2174         switch (conn->ibc_connvars->cv_conndata.status) {
2175         default:
2176                 LBUG();
2177                 
2178         case cm_event_connected:
2179                 kibnal_conn_addref(conn); /* ++ ref for CM callback */
2180                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2181                 if (rc != 0)
2182                         conn->ibc_comms_error = rc;
2183                 /* connection _has_ been established; it's just that we've had
2184                  * an error immediately... */
2185                 kibnal_connreq_done(conn, 0, 0);
2186                 break;
2187                 
2188         case cm_event_conn_timeout:
2189                 kibnal_connreq_done(conn, 0, -ETIMEDOUT);
2190                 break;
2191                 
2192         case cm_event_conn_reject:
2193                 kibnal_connreq_done(conn, 0, -ECONNRESET);
2194                 break;
2195         }
2196 }
2197
2198 void
2199 kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
2200 {
2201         static cm_reply_data_t  reply;
2202         static cm_reject_data_t reject;
2203
2204         kib_msg_t          *rxmsg = (kib_msg_t *)cmreq->priv_data;
2205         kib_msg_t          *txmsg;
2206         kib_conn_t         *conn = NULL;
2207         int                 rc = 0;
2208         kib_connvars_t     *cv;
2209         kib_peer_t         *tmp_peer;
2210         cm_return_t         cmrc;
2211         vv_return_t         vvrc;
2212         
2213         /* I'm the connd executing in thread context
2214          * No concurrency problems with static data! */
2215         LASSERT (!in_interrupt());
2216         LASSERT (current == kibnal_data.kib_connd);
2217
2218         if (cmreq->sid != IBNAL_SERVICE_NUMBER) {
2219                 CERROR(LPX64" != IBNAL_SERVICE_NUMBER("LPX64")\n",
2220                        cmreq->sid, (__u64)IBNAL_SERVICE_NUMBER);
2221                 goto reject;
2222         }
2223
2224         rc = kibnal_unpack_msg(rxmsg, cm_REQ_priv_data_len);
2225         if (rc != 0) {
2226                 CERROR("Can't parse connection request: %d\n", rc);
2227                 goto reject;
2228         }
2229
2230         if (rxmsg->ibm_type != IBNAL_MSG_CONNREQ) {
2231                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
2232                        rxmsg->ibm_type, rxmsg->ibm_srcnid);
2233                 goto reject;
2234         }
2235
2236         if (rxmsg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) {
2237                 CERROR("Can't accept "LPX64": bad dst nid "LPX64"\n",
2238                        rxmsg->ibm_srcnid, rxmsg->ibm_dstnid);
2239                 goto reject;
2240         }
2241
2242         if (rxmsg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2243                 CERROR("Can't accept "LPX64": incompatible queue depth %d (%d wanted)\n",
2244                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_queue_depth, 
2245                        IBNAL_MSG_QUEUE_SIZE);
2246                 goto reject;
2247         }
2248
2249         if (rxmsg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2250                 CERROR("Can't accept "LPX64": message size %d too big (%d max)\n",
2251                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_msg_size, 
2252                        IBNAL_MSG_SIZE);
2253                 goto reject;
2254         }
2255                 
2256         if (rxmsg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2257                 CERROR("Can't accept "LPX64": max frags %d too big (%d max)\n",
2258                        rxmsg->ibm_srcnid, rxmsg->ibm_u.connparams.ibcp_max_frags, 
2259                        IBNAL_MAX_RDMA_FRAGS);
2260                 goto reject;
2261         }
2262                 
2263         conn = kibnal_create_conn(cep);
2264         if (conn == NULL) {
2265                 CERROR("Can't create conn for "LPX64"\n", rxmsg->ibm_srcnid);
2266                 goto reject;
2267         }
2268         
2269         /* assume 'rxmsg->ibm_srcnid' is a new peer */
2270         tmp_peer = kibnal_create_peer (rxmsg->ibm_srcnid);
2271         if (tmp_peer == NULL) {
2272                 CERROR("Can't create tmp peer for "LPX64"\n", rxmsg->ibm_srcnid);
2273                 kibnal_conn_decref(conn);
2274                 conn = NULL;
2275                 goto reject;
2276         }
2277
2278         conn->ibc_peer = tmp_peer;              /* conn takes over my ref */
2279         conn->ibc_incarnation = rxmsg->ibm_srcstamp;
2280         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2281
2282         cv = conn->ibc_connvars;
2283
2284         cv->cv_txpsn          = cmreq->cep_data.start_psn;
2285         cv->cv_remote_qpn     = cmreq->cep_data.qpn;
2286         cv->cv_path           = cmreq->path_data.path;
2287         cv->cv_rnr_count      = cmreq->cep_data.rtr_retry_cnt;
2288         // XXX                  cmreq->cep_data.retry_cnt;
2289         cv->cv_port           = cmreq->cep_data.local_port_num;
2290
2291         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2292                              &cv->cv_path.sgid, &cv->cv_sgid_index);
2293         LASSERT (vvrc == vv_return_ok);
2294         
2295         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2296                                cv->cv_path.pkey, &cv->cv_pkey_index);
2297         LASSERT (vvrc == vv_return_ok);
2298
2299         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2300         if (rc != 0)
2301                 goto reject;
2302
2303         rc = kibnal_post_receives(conn);
2304         if (rc != 0) {
2305                 CERROR("Can't post receives for "LPX64"\n", rxmsg->ibm_srcnid);
2306                 goto reject;
2307         }
2308
2309         rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2310         if (rc != 0)
2311                 goto reject;
2312         
2313         memset(&reply, 0, sizeof(reply));
2314         reply.qpn                 = cv->cv_local_qpn;
2315         reply.qkey                = IBNAL_QKEY;
2316         reply.start_psn           = cv->cv_rxpsn;
2317         reply.arb_initiator_depth = IBNAL_ARB_INITIATOR_DEPTH;
2318         reply.arb_resp_res        = IBNAL_ARB_RESP_RES;
2319         reply.failover_accepted   = IBNAL_FAILOVER_ACCEPTED;
2320         reply.rnr_retry_count     = cv->cv_rnr_count;
2321         reply.targ_ack_delay      = kibnal_data.kib_hca_attrs.ack_delay;
2322         
2323         txmsg = (kib_msg_t *)&reply.priv_data;
2324         kibnal_init_msg(txmsg, IBNAL_MSG_CONNACK, 
2325                         sizeof(txmsg->ibm_u.connparams));
2326         LASSERT (txmsg->ibm_nob <= cm_REP_priv_data_len);
2327         txmsg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2328         txmsg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2329         txmsg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2330         kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp);
2331         
2332         kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
2333         
2334         cmrc = cm_accept(conn->ibc_cep, &reply, NULL,
2335                          kibnal_cm_callback, conn);
2336
2337         if (cmrc == cm_stat_success)
2338                 return;                         /* callback has got my ref on conn */
2339
2340         /* back out state change (no callback happening) */
2341         kibnal_set_conn_state(conn, IBNAL_CONN_INIT);
2342         rc = -EIO;
2343                 
2344  reject:
2345         CERROR("Rejected connreq from "LPX64"\n", rxmsg->ibm_srcnid);
2346
2347         memset(&reject, 0, sizeof(reject));
2348         reject.reason = cm_rej_code_usr_rej;
2349         cm_reject(cep, &reject);
2350
2351         if (conn != NULL) {
2352                 LASSERT (rc != 0);
2353                 kibnal_connreq_done(conn, 0, rc);
2354         } else {
2355                 cm_destroy_cep(cep);
2356         }
2357 }
2358
2359 void
2360 kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *data, void *arg)
2361 {
2362         cm_request_data_t  *cmreq = &data->data.request;
2363         kib_pcreq_t        *pcr;
2364         unsigned long       flags;
2365         
2366         LASSERT (arg == NULL);
2367
2368         if (data->status != cm_event_conn_request) {
2369                 CERROR("status %d is not cm_event_conn_request\n",
2370                        data->status);
2371                 return;
2372         }
2373
2374         PORTAL_ALLOC_ATOMIC(pcr, sizeof(*pcr));
2375         if (pcr == NULL) {
2376                 CERROR("Can't allocate passive connreq\n");
2377
2378                 cm_reject(cep, &((cm_reject_data_t) /* NB RO struct */
2379                                  {.reason = cm_rej_code_no_res,}));
2380                 cm_destroy_cep(cep);
2381                 return;
2382         }
2383
2384         pcr->pcr_cep = cep;
2385         pcr->pcr_cmreq = *cmreq;
2386         
2387         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2388
2389         list_add_tail(&pcr->pcr_list, &kibnal_data.kib_connd_pcreqs);
2390         wake_up(&kibnal_data.kib_connd_waitq);
2391         
2392         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2393 }
2394
2395
2396 void
2397 kibnal_active_connect_callback (cm_cep_handle_t cep, cm_conn_data_t *cd, 
2398                                 void *arg)
2399 {
2400         /* CAVEAT EMPTOR: tasklet context */
2401         kib_conn_t       *conn = (kib_conn_t *)arg;
2402         kib_connvars_t   *cv = conn->ibc_connvars;
2403         unsigned long     flags;
2404
2405         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2406         cv->cv_conndata = *cd;
2407
2408         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2409         /* connd takes my ref */
2410         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2411         wake_up(&kibnal_data.kib_connd_waitq);
2412         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2413 }
2414
2415 void
2416 kibnal_connect_conn (kib_conn_t *conn)
2417 {
2418         static cm_request_data_t  cmreq;
2419         kib_msg_t                *msg = (kib_msg_t *)&cmreq.priv_data;
2420         kib_connvars_t           *cv = conn->ibc_connvars;
2421         kib_peer_t               *peer = conn->ibc_peer;
2422         cm_return_t               cmrc;
2423         
2424         /* Only called by connd => statics OK */
2425         LASSERT (!in_interrupt());
2426         LASSERT (current == kibnal_data.kib_connd);
2427         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2428
2429         memset(&cmreq, 0, sizeof(cmreq));
2430         
2431         cmreq.sid = IBNAL_SERVICE_NUMBER;
2432
2433         cmreq.cep_data.ca_guid              = kibnal_data.kib_hca_attrs.guid;
2434         cmreq.cep_data.qpn                  = cv->cv_local_qpn;
2435         cmreq.cep_data.retry_cnt            = IBNAL_RETRY_CNT;
2436         cmreq.cep_data.rtr_retry_cnt        = IBNAL_RNR_CNT;
2437         cmreq.cep_data.start_psn            = cv->cv_rxpsn;
2438         cmreq.cep_data.end_to_end_flow_ctrl = IBNAL_EE_FLOW_CNT;
2439         // XXX ack_timeout?
2440         // offered_resp_res
2441         // offered_initiator_depth
2442
2443         cmreq.path_data.subn_local  = IBNAL_LOCAL_SUB;
2444         cmreq.path_data.path        = cv->cv_path;
2445         
2446         kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2447         LASSERT(msg->ibm_nob <= cm_REQ_priv_data_len);
2448         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2449         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2450         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2451         kibnal_pack_msg(msg, 0, peer->ibp_nid, 0);
2452         
2453         CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
2454
2455         kibnal_conn_addref(conn);               /* ++ref for CM callback */
2456         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CONNECT);
2457
2458         cmrc = cm_connect(conn->ibc_cep, &cmreq, 
2459                           kibnal_active_connect_callback, conn);
2460         if (cmrc == cm_stat_success) {
2461                 CDEBUG(D_NET, "connection REQ sent to "LPX64"\n",
2462                        peer->ibp_nid);
2463                 return;
2464         }
2465
2466         CERROR ("Connect "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2467         kibnal_conn_decref(conn);       /* drop callback's ref */
2468         kibnal_connreq_done(conn, 1, -EHOSTUNREACH);
2469 }
2470
2471 void
2472 kibnal_check_connreply (kib_conn_t *conn)
2473 {
2474         static cm_rtu_data_t  rtu;
2475
2476         kib_connvars_t   *cv = conn->ibc_connvars;
2477         cm_reply_data_t  *reply = &cv->cv_conndata.data.reply;
2478         kib_msg_t        *msg = (kib_msg_t *)&reply->priv_data;
2479         kib_peer_t       *peer = conn->ibc_peer;
2480         cm_return_t       cmrc;
2481         cm_cep_handle_t   cep;
2482         unsigned long     flags;
2483         int               rc;
2484
2485         /* Only called by connd => statics OK */
2486         LASSERT (!in_interrupt());
2487         LASSERT (current == kibnal_data.kib_connd);
2488         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2489
2490         if (cv->cv_conndata.status == cm_event_conn_reply) {
2491                 cv->cv_remote_qpn = reply->qpn;
2492                 cv->cv_txpsn      = reply->start_psn;
2493                 // XXX              reply->targ_ack_delay;
2494                 cv->cv_rnr_count  = reply->rnr_retry_count;
2495
2496                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2497
2498                 rc = kibnal_unpack_msg(msg, cm_REP_priv_data_len);
2499                 if (rc != 0) {
2500                         CERROR("Can't unpack reply from "LPX64"\n",
2501                                peer->ibp_nid);
2502                         kibnal_connreq_done(conn, 1, rc);
2503                         return;
2504                 }
2505
2506                 if (msg->ibm_type != IBNAL_MSG_CONNACK ) {
2507                         CERROR("Unexpected message type %d from "LPX64"\n",
2508                                msg->ibm_type, peer->ibp_nid);
2509                         kibnal_connreq_done(conn, 1, -EPROTO);
2510                         return;
2511                 }
2512
2513                 if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2514                         CERROR(LPX64" has incompatible queue depth %d(%d wanted)\n",
2515                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_queue_depth,
2516                                IBNAL_MSG_QUEUE_SIZE);
2517                         kibnal_connreq_done(conn, 1, -EPROTO);
2518                         return;
2519                 }
2520                 
2521                 if (msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2522                         CERROR(LPX64" max message size %d too big (%d max)\n",
2523                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_msg_size, 
2524                                IBNAL_MSG_SIZE);
2525                         kibnal_connreq_done(conn, 1, -EPROTO);
2526                         return;
2527                 }
2528
2529                 if (msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2530                         CERROR(LPX64" max frags %d too big (%d max)\n",
2531                                peer->ibp_nid, msg->ibm_u.connparams.ibcp_max_frags, 
2532                                IBNAL_MAX_RDMA_FRAGS);
2533                         kibnal_connreq_done(conn, 1, -EPROTO);
2534                         return;
2535                 }
2536                 
2537                 read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2538                 rc = (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2539                       msg->ibm_dststamp != kibnal_data.kib_incarnation) ?
2540                      -ESTALE : 0;
2541                 read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2542                 if (rc != 0) {
2543                         CERROR("Stale connection reply from "LPX64"\n",
2544                                peer->ibp_nid);
2545                         kibnal_connreq_done(conn, 1, rc);
2546                         return;
2547                 }
2548
2549                 conn->ibc_incarnation = msg->ibm_srcstamp;
2550                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2551                 
2552                 rc = kibnal_post_receives(conn);
2553                 if (rc != 0) {
2554                         CERROR("Can't post receives for "LPX64"\n",
2555                                peer->ibp_nid);
2556                         kibnal_connreq_done(conn, 1, rc);
2557                         return;
2558                 }
2559                 
2560                 rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2561                 if (rc != 0) {
2562                         kibnal_connreq_done(conn, 1, rc);
2563                         return;
2564                 }
2565                 
2566                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2567                 if (rc != 0) {
2568                         kibnal_connreq_done(conn, 1, rc);
2569                         return;
2570                 }
2571                 
2572                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_RTU);
2573                 kibnal_conn_addref(conn);       /* ++for CM callback */
2574                 
2575                 memset(&rtu, 0, sizeof(rtu));
2576                 cmrc = cm_accept(conn->ibc_cep, NULL, &rtu,
2577                                  kibnal_cm_callback, conn);
2578                 if (cmrc == cm_stat_success) {
2579                         /* Now I'm racing with disconnect signalled by
2580                          * kibnal_cm_callback */
2581                         kibnal_connreq_done(conn, 1, 0);
2582                         return;
2583                 }
2584
2585                 CERROR("cm_accept "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2586                 /* Back out of RTU: no callback coming */
2587                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2588                 kibnal_conn_decref(conn);
2589                 kibnal_connreq_done(conn, 1, -EIO);
2590                 return;
2591         }
2592
2593         if (cv->cv_conndata.status == cm_event_conn_reject) {
2594
2595                 if (cv->cv_conndata.data.reject.reason != cm_rej_code_stale_conn) {
2596                         CERROR("conn -> "LPX64" rejected: %d\n", peer->ibp_nid,
2597                                cv->cv_conndata.data.reject.reason);
2598                         kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2599                         return;
2600                 }
2601
2602                 CWARN ("conn -> "LPX64" stale: retrying\n", peer->ibp_nid);
2603
2604                 cep = cm_create_cep(cm_cep_transp_rc);
2605                 if (cep == NULL) {
2606                         CERROR("Can't create new CEP\n");
2607                         kibnal_connreq_done(conn, 1, -ENOMEM);
2608                         return;
2609                 }
2610
2611                 cmrc = cm_cancel(conn->ibc_cep);
2612                 LASSERT (cmrc == cm_stat_success);
2613                 cmrc = cm_destroy_cep(conn->ibc_cep);
2614                 LASSERT (cmrc == cm_stat_success);
2615
2616                 conn->ibc_cep = cep;
2617
2618                 /* retry connect */
2619                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2620                 kibnal_connect_conn(conn);
2621                 return;
2622         }
2623
2624         CERROR("conn -> "LPX64" failed: %d\n", peer->ibp_nid,
2625                cv->cv_conndata.status);
2626         kibnal_connreq_done(conn, 1, -ECONNABORTED);
2627 }
2628
2629 void
2630 kibnal_send_connreq (kib_conn_t *conn)
2631 {
2632         kib_peer_t           *peer = conn->ibc_peer;
2633         kib_connvars_t       *cv = conn->ibc_connvars;
2634         ibat_arp_data_t      *arp = &cv->cv_arp;
2635         ib_path_record_v2_t  *path = &cv->cv_path;
2636         vv_return_t           vvrc;
2637         int                   rc;
2638
2639         /* Only called by connd => statics OK */
2640         LASSERT (!in_interrupt());
2641         LASSERT (current == kibnal_data.kib_connd);
2642         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2643         
2644         if (cv->cv_arprc != ibat_stat_ok) {
2645                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: %d\n", peer->ibp_nid,
2646                        HIPQUAD(peer->ibp_ip), cv->cv_arprc);
2647                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2648                 return;
2649         }
2650
2651         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
2652                 CDEBUG(D_NET, "Got valid path for "LPX64"\n", peer->ibp_nid);
2653
2654                 *path = *arp->primary_path;
2655
2656                 vvrc = base_gid2port_num(kibnal_data.kib_hca, &path->sgid,
2657                                          &cv->cv_port);
2658                 LASSERT (vvrc == vv_return_ok);
2659
2660                 vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2661                                      &path->sgid, &cv->cv_sgid_index);
2662                 LASSERT (vvrc == vv_return_ok);
2663
2664                 vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2665                                        path->pkey, &cv->cv_pkey_index);
2666                 LASSERT (vvrc == vv_return_ok);
2667
2668                 path->mtu = IBNAL_IB_MTU;
2669
2670         } else if ((arp->mask & IBAT_LID_VALID) != 0) {
2671                 CWARN("Creating new path record for "LPX64"@%u.%u.%u.%u\n",
2672                       peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2673
2674                 cv->cv_pkey_index = IBNAL_PKEY_IDX;
2675                 cv->cv_sgid_index = IBNAL_SGID_IDX;
2676                 cv->cv_port = arp->local_port_num;
2677
2678                 memset(path, 0, sizeof(*path));
2679
2680                 vvrc = port_num2base_gid(kibnal_data.kib_hca, cv->cv_port,
2681                                          &path->sgid);
2682                 LASSERT (vvrc == vv_return_ok);
2683
2684                 vvrc = port_num2base_lid(kibnal_data.kib_hca, cv->cv_port,
2685                                          &path->slid);
2686                 LASSERT (vvrc == vv_return_ok);
2687
2688                 path->dgid          = arp->gid;
2689                 path->sl            = IBNAL_SERVICE_LEVEL;
2690                 path->dlid          = arp->lid;
2691                 path->mtu           = IBNAL_IB_MTU;
2692                 path->rate          = IBNAL_STATIC_RATE;
2693                 path->pkt_life_time = IBNAL_PKT_LIFETIME;
2694                 path->pkey          = IBNAL_PKEY;
2695                 path->traffic_class = IBNAL_TRAFFIC_CLASS;
2696         } else {
2697                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: no PATH or LID\n", 
2698                        peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2699                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2700                 return;
2701         }
2702
2703         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2704         if (rc != 0) {
2705                 kibnal_connreq_done(conn, 1, rc);
2706         }
2707
2708         /* do the actual connection request */
2709         kibnal_connect_conn(conn);
2710 }
2711
2712 void
2713 kibnal_arp_callback (ibat_stat_t arprc, ibat_arp_data_t *arp_data, void *arg)
2714 {
2715         /* CAVEAT EMPTOR: tasklet context */
2716         kib_conn_t      *conn = (kib_conn_t *)arg;
2717         kib_peer_t      *peer = conn->ibc_peer;
2718         unsigned long    flags;
2719
2720         CDEBUG(D_NET, "Arp "LPX64"@%u.%u.%u.%u rc %d LID %s PATH %s\n",
2721                peer->ibp_nid, HIPQUAD(peer->ibp_ip), arprc,
2722                (arp_data->mask & IBAT_LID_VALID) == 0 ? "invalid" : "valid",
2723                (arp_data->mask & IBAT_PRI_PATH_VALID) == 0 ? "invalid" : "valid");
2724         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2725
2726         conn->ibc_connvars->cv_arprc = arprc;
2727         conn->ibc_connvars->cv_arp = *arp_data;
2728         
2729         /* connd takes over my ref on conn */
2730         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2731         
2732         list_add_tail(&conn->ibc_list, &kibnal_data.kib_connd_conns);
2733         wake_up(&kibnal_data.kib_connd_waitq);
2734         
2735         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2736 }
2737
2738 void
2739 kibnal_arp_peer (kib_peer_t *peer)
2740 {
2741         cm_cep_handle_t  cep;
2742         kib_conn_t      *conn;
2743         int              ibatrc;
2744
2745         /* Only the connd does this (i.e. single threaded) */
2746         LASSERT (current == kibnal_data.kib_connd);
2747         LASSERT (peer->ibp_connecting != 0);
2748
2749         cep = cm_create_cep(cm_cep_transp_rc);
2750         if (cep == NULL) {
2751                 CERROR ("Can't create cep for conn->"LPX64"\n",
2752                         peer->ibp_nid);
2753                 kibnal_peer_connect_failed(peer, 1);
2754                 return;
2755         }
2756
2757         conn = kibnal_create_conn(cep);
2758         if (conn == NULL) {
2759                 CERROR ("Can't allocate conn->"LPX64"\n",
2760                         peer->ibp_nid);
2761                 cm_destroy_cep(cep);
2762                 kibnal_peer_connect_failed(peer, 1);
2763                 return;
2764         }
2765
2766         conn->ibc_peer = peer;
2767         kibnal_peer_addref(peer);
2768
2769         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2770
2771         ibatrc = ibat_get_ib_data(htonl(peer->ibp_ip), INADDR_ANY, 
2772                                   ibat_paths_primary,
2773                                   &conn->ibc_connvars->cv_arp, 
2774                                   kibnal_arp_callback, conn, 0);
2775         CDEBUG(D_NET,"ibatrc %d\n", ibatrc);
2776         switch (ibatrc) {
2777         default:
2778                 LBUG();
2779                 
2780         case ibat_stat_pending:
2781                 /* NB callback has my ref on conn */
2782                 break;
2783                 
2784         case ibat_stat_ok:
2785                 /* Immediate return (ARP cache hit) == no callback. */
2786                 kibnal_send_connreq(conn);
2787                 kibnal_conn_decref(conn);
2788                 break;
2789
2790         case ibat_stat_error:
2791         case ibat_stat_timeout:
2792         case ibat_stat_not_found:
2793                 CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n", peer->ibp_nid,
2794                        HIPQUAD(peer->ibp_ip), ibatrc);
2795                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2796                 kibnal_conn_decref(conn);
2797                 break;
2798         }
2799 }
2800
2801 int
2802 kibnal_conn_timed_out (kib_conn_t *conn)
2803 {
2804         kib_tx_t          *tx;
2805         struct list_head  *ttmp;
2806
2807         spin_lock(&conn->ibc_lock);
2808
2809         list_for_each (ttmp, &conn->ibc_tx_queue) {
2810                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2811
2812                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2813                         spin_unlock(&conn->ibc_lock);
2814                         return 1;
2815                 }
2816         }
2817
2818         list_for_each (ttmp, &conn->ibc_active_txs) {
2819                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2820
2821                 LASSERT (tx->tx_waiting ||
2822                          tx->tx_sending != 0);
2823
2824                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2825                         spin_unlock(&conn->ibc_lock);
2826                         return 1;
2827                 }
2828         }
2829
2830         spin_unlock(&conn->ibc_lock);
2831         return 0;
2832 }
2833
2834 void
2835 kibnal_check_conns (int idx)
2836 {
2837         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2838         struct list_head  *ptmp;
2839         kib_peer_t        *peer;
2840         kib_conn_t        *conn;
2841         struct list_head  *ctmp;
2842         unsigned long      flags;
2843
2844  again:
2845         /* NB. We expect to have a look at all the peers and not find any
2846          * rdmas to time out, so we just use a shared lock while we
2847          * take a look... */
2848         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2849
2850         list_for_each (ptmp, peers) {
2851                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2852
2853                 list_for_each (ctmp, &peer->ibp_conns) {
2854                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2855
2856                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2857
2858                         /* In case we have enough credits to return via a
2859                          * NOOP, but there were no non-blocking tx descs
2860                          * free to do it last time... */
2861                         kibnal_check_sends(conn);
2862
2863                         if (!kibnal_conn_timed_out(conn))
2864                                 continue;
2865
2866                         /* Handle timeout by closing the whole connection.  We
2867                          * can only be sure RDMA activity has ceased once the
2868                          * QP has been modified. */
2869                         
2870                         kibnal_conn_addref(conn); /* 1 ref for me... */
2871
2872                         read_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2873                                                flags);
2874
2875                         CERROR("Timed out RDMA with "LPX64"\n",
2876                                peer->ibp_nid);
2877
2878                         kibnal_close_conn (conn, -ETIMEDOUT);
2879                         kibnal_conn_decref(conn); /* ...until here */
2880
2881                         /* start again now I've dropped the lock */
2882                         goto again;
2883                 }
2884         }
2885
2886         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2887 }
2888
2889 void
2890 kibnal_disconnect_conn (kib_conn_t *conn)
2891 {
2892         static cm_drequest_data_t dreq;         /* just for the space */
2893         
2894         cm_return_t    cmrc;
2895         unsigned long  flags;
2896
2897         LASSERT (!in_interrupt());
2898         LASSERT (current == kibnal_data.kib_connd);
2899         
2900         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2901
2902         if (conn->ibc_disconnect) {
2903                 /* Had the CM callback already */
2904                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
2905                                         flags);
2906                 kibnal_conn_disconnected(conn);
2907                 return;
2908         }
2909                 
2910         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2911
2912         /* active disconnect */
2913         cmrc = cm_disconnect(conn->ibc_cep, &dreq, NULL);
2914         if (cmrc == cm_stat_success) {
2915                 /* waiting for CM */
2916                 conn->ibc_state = IBNAL_CONN_DISCONNECT2;
2917                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2918                 return;
2919         }
2920
2921         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2922
2923         cm_cancel(conn->ibc_cep);
2924         kibnal_pause(HZ/10);
2925
2926         if (!conn->ibc_disconnect)              /* CM callback will never happen now */
2927                 kibnal_conn_decref(conn);
2928         
2929         LASSERT (atomic_read(&conn->ibc_refcount) > 0);
2930         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
2931
2932         kibnal_conn_disconnected(conn);
2933 }
2934
2935 int
2936 kibnal_connd (void *arg)
2937 {
2938         wait_queue_t       wait;
2939         unsigned long      flags;
2940         kib_pcreq_t       *pcr;
2941         kib_conn_t        *conn;
2942         kib_peer_t        *peer;
2943         int                timeout;
2944         int                i;
2945         int                dropped_lock;
2946         int                peer_index = 0;
2947         unsigned long      deadline = jiffies;
2948         
2949         kportal_daemonize ("kibnal_connd");
2950         kportal_blockallsigs ();
2951
2952         init_waitqueue_entry (&wait, current);
2953         kibnal_data.kib_connd = current;
2954
2955         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2956
2957         while (!kibnal_data.kib_shutdown) {
2958
2959                 dropped_lock = 0;
2960
2961                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
2962                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
2963                                            kib_conn_t, ibc_list);
2964                         list_del (&conn->ibc_list);
2965                         
2966                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2967                         dropped_lock = 1;
2968
2969                         kibnal_destroy_conn(conn);
2970
2971                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2972                 }
2973
2974                 if (!list_empty (&kibnal_data.kib_connd_pcreqs)) {
2975                         pcr = list_entry(kibnal_data.kib_connd_pcreqs.next,
2976                                          kib_pcreq_t, pcr_list);
2977                         list_del(&pcr->pcr_list);
2978                         
2979                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2980                         dropped_lock = 1;
2981
2982                         kibnal_recv_connreq(pcr->pcr_cep, &pcr->pcr_cmreq);
2983                         PORTAL_FREE(pcr, sizeof(*pcr));
2984
2985                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2986                 }
2987                         
2988                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2989                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2990                                            kib_peer_t, ibp_connd_list);
2991                         
2992                         list_del_init (&peer->ibp_connd_list);
2993                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2994                         dropped_lock = 1;
2995
2996                         kibnal_arp_peer (peer);
2997                         kibnal_peer_decref (peer);
2998
2999                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3000                 }
3001
3002                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3003                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3004                                            kib_conn_t, ibc_list);
3005                         list_del (&conn->ibc_list);
3006                         
3007                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3008                         dropped_lock = 1;
3009
3010                         switch (conn->ibc_state) {
3011                         default:
3012                                 LBUG();
3013                                 
3014                         case IBNAL_CONN_ACTIVE_ARP:
3015                                 kibnal_send_connreq(conn);
3016                                 break;
3017
3018                         case IBNAL_CONN_ACTIVE_CONNECT:
3019                                 kibnal_check_connreply(conn);
3020                                 break;
3021
3022                         case IBNAL_CONN_PASSIVE_WAIT:
3023                                 kibnal_check_passive_wait(conn);
3024                                 break;
3025
3026                         case IBNAL_CONN_DISCONNECT1:
3027                         case IBNAL_CONN_DISCONNECT2:
3028                                 kibnal_disconnect_conn(conn);
3029                                 break;
3030                         }
3031                         kibnal_conn_decref(conn);
3032
3033                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3034                 }
3035
3036                 /* careful with the jiffy wrap... */
3037                 timeout = (int)(deadline - jiffies);
3038                 if (timeout <= 0) {
3039                         const int n = 4;
3040                         const int p = 1;
3041                         int       chunk = kibnal_data.kib_peer_hash_size;
3042                         
3043                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3044                         dropped_lock = 1;
3045
3046                         /* Time to check for RDMA timeouts on a few more
3047                          * peers: I do checks every 'p' seconds on a
3048                          * proportion of the peer table and I need to check
3049                          * every connection 'n' times within a timeout
3050                          * interval, to ensure I detect a timeout on any
3051                          * connection within (n+1)/n times the timeout
3052                          * interval. */
3053
3054                         if (kibnal_tunables.kib_io_timeout > n * p)
3055                                 chunk = (chunk * n * p) / 
3056                                         kibnal_tunables.kib_io_timeout;
3057                         if (chunk == 0)
3058                                 chunk = 1;
3059
3060                         for (i = 0; i < chunk; i++) {
3061                                 kibnal_check_conns (peer_index);
3062                                 peer_index = (peer_index + 1) % 
3063                                              kibnal_data.kib_peer_hash_size;
3064                         }
3065
3066                         deadline += p * HZ;
3067                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3068                 }
3069
3070                 if (dropped_lock)
3071                         continue;
3072                 
3073                 /* Nothing to do for 'timeout'  */
3074                 set_current_state (TASK_INTERRUPTIBLE);
3075                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3076                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3077
3078                 schedule_timeout (timeout);
3079
3080                 set_current_state (TASK_RUNNING);
3081                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3082                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3083         }
3084
3085         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3086
3087         kibnal_thread_fini ();
3088         return (0);
3089 }
3090
3091 void 
3092 kibnal_async_callback(vv_event_record_t ev)
3093 {
3094         CERROR("type: %d, port: %d, data: "LPX64"\n", 
3095                ev.event_type, ev.port_num, ev.type.data);
3096 }
3097
3098 void
3099 kibnal_cq_callback (unsigned long unused_context)
3100 {
3101         unsigned long    flags;
3102
3103         CDEBUG(D_NET, "!!\n");
3104
3105         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3106         kibnal_data.kib_ready = 1;
3107         wake_up(&kibnal_data.kib_sched_waitq);
3108         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3109 }
3110
3111 int
3112 kibnal_scheduler(void *arg)
3113 {
3114         long            id = (long)arg;
3115         wait_queue_t    wait;
3116         char            name[16];
3117         vv_wc_t         wc;
3118         vv_return_t     vvrc;
3119         vv_return_t     vvrc2;
3120         unsigned long   flags;
3121         int             busy_loops = 0;
3122
3123         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3124         kportal_daemonize(name);
3125         kportal_blockallsigs();
3126
3127         init_waitqueue_entry(&wait, current);
3128
3129         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3130
3131         while (!kibnal_data.kib_shutdown) {
3132                 if (busy_loops++ >= IBNAL_RESCHED) {
3133                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3134                                                flags);
3135
3136                         our_cond_resched();
3137                         busy_loops = 0;
3138                         
3139                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3140                 }
3141
3142                 if (kibnal_data.kib_ready &&
3143                     !kibnal_data.kib_checking_cq) {
3144                         /* take ownership of completion polling */
3145                         kibnal_data.kib_checking_cq = 1;
3146                         /* Assume I'll exhaust the CQ */
3147                         kibnal_data.kib_ready = 0;
3148                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, 
3149                                                flags);
3150                         
3151                         vvrc = vv_poll_for_completion(kibnal_data.kib_hca, 
3152                                                       kibnal_data.kib_cq, &wc);
3153                         if (vvrc == vv_return_err_cq_empty) {
3154                                 vvrc2 = vv_request_completion_notification(
3155                                         kibnal_data.kib_hca, 
3156                                         kibnal_data.kib_cq, 
3157                                         vv_next_solicit_unsolicit_event);
3158                                 LASSERT (vvrc2 == vv_return_ok);
3159                         }
3160                         
3161                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3162                         /* give up ownership of completion polling */
3163                         kibnal_data.kib_checking_cq = 0;
3164
3165                         if (vvrc == vv_return_err_cq_empty)
3166                                 continue;
3167
3168                         LASSERT (vvrc == vv_return_ok);
3169                         /* Assume there's more: get another scheduler to check
3170                          * while I handle this completion... */
3171
3172                         kibnal_data.kib_ready = 1;
3173                         wake_up(&kibnal_data.kib_sched_waitq);
3174
3175                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3176                                                flags);
3177
3178                         switch (kibnal_wreqid2type(wc.wr_id)) {
3179                         case IBNAL_WID_RX:
3180                                 kibnal_rx_complete(
3181                                         (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id),
3182                                         wc.completion_status,
3183                                         wc.num_bytes_transfered);
3184                                 break;
3185
3186                         case IBNAL_WID_TX:
3187                                 kibnal_tx_complete(
3188                                         (kib_tx_t *)kibnal_wreqid2ptr(wc.wr_id),
3189                                         wc.completion_status);
3190                                 break;
3191
3192                         case IBNAL_WID_RDMA:
3193                                 /* We only get RDMA completion notification if
3194                                  * it fails.  So we just ignore them completely
3195                                  * because...
3196                                  *
3197                                  * 1) If an RDMA fails, all subsequent work
3198                                  * items, including the final SEND will fail
3199                                  * too, so I'm still guaranteed to notice that
3200                                  * this connection is hosed.
3201                                  *
3202                                  * 2) It's positively dangerous to look inside
3203                                  * the tx descriptor obtained from an RDMA work
3204                                  * item.  As soon as I drop the kib_sched_lock,
3205                                  * I give a scheduler on another CPU a chance
3206                                  * to get the final SEND completion, so the tx
3207                                  * descriptor can get freed as I inspect it. */
3208                                 CERROR ("RDMA failed: %d\n", 
3209                                         wc.completion_status);
3210                                 break;
3211
3212                         default:
3213                                 LBUG();
3214                         }
3215                         
3216                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3217                         continue;
3218                 }
3219
3220                 /* Nothing to do; sleep... */
3221
3222                 set_current_state(TASK_INTERRUPTIBLE);
3223                 add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3224                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3225                                        flags);
3226
3227                 schedule();
3228
3229                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3230                 set_current_state(TASK_RUNNING);
3231                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3232         }
3233
3234         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3235
3236         kibnal_thread_fini();
3237         return (0);
3238 }
3239
3240
3241 lib_nal_t kibnal_lib = {
3242         .libnal_data = &kibnal_data,      /* NAL private data */
3243         .libnal_send = kibnal_send,
3244         .libnal_send_pages = kibnal_send_pages,
3245         .libnal_recv = kibnal_recv,
3246         .libnal_recv_pages = kibnal_recv_pages,
3247         .libnal_dist = kibnal_dist
3248 };