Whamcloud - gitweb
* Use FMR in vibnal to avoid allocating huge contiguous memory for QPs
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 void
28 kibnal_tx_done (kib_tx_t *tx)
29 {
30         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
31         int              i;
32
33         LASSERT (!in_interrupt());
34         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
35         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
36         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
37
38 #if IBNAL_USE_FMR
39         if (tx->tx_md.md_fmrcount == 0) {
40                 vv_return_t      vvrc;
41
42                 /* mapping must be active (it dropped fmrcount to 0) */
43                 LASSERT (tx->tx_md.md_active); 
44
45                 vvrc = vv_unmap_fmr(kibnal_data.kib_hca,
46                                     1, &tx->tx_md.md_fmrhandle);
47                 LASSERT (vvrc == vv_return_ok);
48
49                 tx->tx_md.md_fmrcount = IBNAL_FMR_NMAPS;
50         }
51         tx->tx_md.md_active = 0;
52 #endif
53         for (i = 0; i < 2; i++) {
54                 /* tx may have up to 2 libmsgs to finalise */
55                 if (tx->tx_libmsg[i] == NULL)
56                         continue;
57
58                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
59                 tx->tx_libmsg[i] = NULL;
60         }
61         
62         if (tx->tx_conn != NULL) {
63                 kibnal_conn_decref(tx->tx_conn);
64                 tx->tx_conn = NULL;
65         }
66
67         tx->tx_nwrq = 0;
68         tx->tx_status = 0;
69
70         spin_lock(&kibnal_data.kib_tx_lock);
71
72         if (tx->tx_isnblk) {
73                 list_add (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
74         } else {
75                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
76                 wake_up (&kibnal_data.kib_idle_tx_waitq);
77         }
78
79         spin_unlock(&kibnal_data.kib_tx_lock);
80 }
81
82 kib_tx_t *
83 kibnal_get_idle_tx (int may_block) 
84 {
85         kib_tx_t      *tx = NULL;
86         ENTRY;
87         
88         for (;;) {
89                 spin_lock(&kibnal_data.kib_tx_lock);
90
91                 /* "normal" descriptor is free */
92                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
93                         tx = list_entry (kibnal_data.kib_idle_txs.next,
94                                          kib_tx_t, tx_list);
95                         break;
96                 }
97
98                 if (!may_block) {
99                         /* may dip into reserve pool */
100                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
101                                 CERROR ("reserved tx desc pool exhausted\n");
102                                 break;
103                         }
104
105                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
106                                          kib_tx_t, tx_list);
107                         break;
108                 }
109
110                 /* block for idle tx */
111                 spin_unlock(&kibnal_data.kib_tx_lock);
112
113                 wait_event (kibnal_data.kib_idle_tx_waitq,
114                             !list_empty (&kibnal_data.kib_idle_txs) ||
115                             kibnal_data.kib_shutdown);
116         }
117
118         if (tx != NULL) {
119                 list_del (&tx->tx_list);
120
121                 /* Allocate a new completion cookie.  It might not be needed,
122                  * but we've got a lock right now and we're unlikely to
123                  * wrap... */
124                 tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
125
126                 LASSERT (tx->tx_nwrq == 0);
127                 LASSERT (!tx->tx_queued);
128                 LASSERT (tx->tx_sending == 0);
129                 LASSERT (!tx->tx_waiting);
130                 LASSERT (tx->tx_status == 0);
131                 LASSERT (tx->tx_conn == NULL);
132                 LASSERT (tx->tx_libmsg[0] == NULL);
133                 LASSERT (tx->tx_libmsg[1] == NULL);
134         }
135
136         spin_unlock(&kibnal_data.kib_tx_lock);
137         
138         RETURN(tx);
139 }
140
141 int
142 kibnal_post_rx (kib_rx_t *rx, int credit)
143 {
144         kib_conn_t   *conn = rx->rx_conn;
145         int           rc = 0;
146         __u64         addr = (__u64)((unsigned long)((rx)->rx_msg));
147         vv_return_t   vvrc;
148
149         LASSERT (!in_interrupt());
150         
151         rx->rx_gl = (vv_scatgat_t) {
152                 .v_address = KIBNAL_ADDR2SG(addr),
153                 .l_key     = rx->rx_lkey,
154                 .length    = IBNAL_MSG_SIZE,
155         };
156
157         rx->rx_wrq = (vv_wr_t) {
158                 .wr_id                   = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
159                 .completion_notification = 1,
160                 .scatgat_list            = &rx->rx_gl,
161                 .num_of_data_segments    = 1,
162                 .wr_type                 = vv_wr_receive,
163         };
164
165         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
166         LASSERT (!rx->rx_posted);
167
168         CDEBUG(D_NET, "posting rx [%d %x "LPX64"]\n", 
169                rx->rx_wrq.scatgat_list->length,
170                rx->rx_wrq.scatgat_list->l_key,
171                KIBNAL_SG2ADDR(rx->rx_wrq.scatgat_list->v_address));
172
173         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
174                 /* No more posts for this rx; so lose its ref */
175                 kibnal_conn_decref(conn);
176                 return 0;
177         }
178         
179         rx->rx_posted = 1;
180
181         spin_lock(&conn->ibc_lock);
182         /* Serialise vv_post_receive; it's not re-entrant on the same QP */
183         vvrc = vv_post_receive(kibnal_data.kib_hca,
184                                conn->ibc_qp, &rx->rx_wrq);
185         spin_unlock(&conn->ibc_lock);
186
187         if (vvrc == 0) {
188                 if (credit) {
189                         spin_lock(&conn->ibc_lock);
190                         conn->ibc_outstanding_credits++;
191                         spin_unlock(&conn->ibc_lock);
192
193                         kibnal_check_sends(conn);
194                 }
195                 return 0;
196         }
197         
198         CERROR ("post rx -> "LPX64" failed %d\n", 
199                 conn->ibc_peer->ibp_nid, vvrc);
200         rc = -EIO;
201         kibnal_close_conn(rx->rx_conn, rc);
202         /* No more posts for this rx; so lose its ref */
203         kibnal_conn_decref(conn);
204         return rc;
205 }
206
207 int
208 kibnal_post_receives (kib_conn_t *conn)
209 {
210         int    i;
211         int    rc;
212
213         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
214         LASSERT (conn->ibc_comms_error == 0);
215
216         for (i = 0; i < IBNAL_RX_MSGS; i++) {
217                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
218                  * fails (i.e. actual failure or we're disconnecting) */
219                 kibnal_conn_addref(conn);
220                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0);
221                 if (rc != 0)
222                         return rc;
223         }
224
225         return 0;
226 }
227
228 kib_tx_t *
229 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
230 {
231         struct list_head   *tmp;
232         
233         list_for_each(tmp, &conn->ibc_active_txs) {
234                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
235                 
236                 LASSERT (!tx->tx_queued);
237                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
238
239                 if (tx->tx_cookie != cookie)
240                         continue;
241
242                 if (tx->tx_waiting &&
243                     tx->tx_msg->ibm_type == txtype)
244                         return tx;
245
246                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
247                       tx->tx_waiting ? "" : "NOT ",
248                       tx->tx_msg->ibm_type, txtype);
249         }
250         return NULL;
251 }
252
253 void
254 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
255 {
256         kib_tx_t    *tx;
257         int          idle;
258
259         spin_lock(&conn->ibc_lock);
260
261         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
262         if (tx == NULL) {
263                 spin_unlock(&conn->ibc_lock);
264
265                 CWARN("Unmatched completion type %x cookie "LPX64
266                       " from "LPX64"\n",
267                       txtype, cookie, conn->ibc_peer->ibp_nid);
268                 kibnal_close_conn (conn, -EPROTO);
269                 return;
270         }
271
272         if (tx->tx_status == 0) {               /* success so far */
273                 if (status < 0) {               /* failed? */
274                         tx->tx_status = status;
275                 } else if (txtype == IBNAL_MSG_GET_REQ) { 
276                         /* XXX layering violation: set REPLY data length */
277                         LASSERT (tx->tx_libmsg[1] != NULL);
278                         LASSERT (tx->tx_libmsg[1]->ev.type == 
279                                  PTL_EVENT_REPLY_END);
280
281                         tx->tx_libmsg[1]->ev.mlength = status;
282                 }
283         }
284         
285         tx->tx_waiting = 0;
286
287         idle = !tx->tx_queued && (tx->tx_sending == 0);
288         if (idle)
289                 list_del(&tx->tx_list);
290
291         spin_unlock(&conn->ibc_lock);
292         
293         if (idle)
294                 kibnal_tx_done(tx);
295 }
296
297 void
298 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
299 {
300         kib_tx_t    *tx = kibnal_get_idle_tx(0);
301         
302         if (tx == NULL) {
303                 CERROR("Can't get tx for completion %x for "LPX64"\n",
304                        type, conn->ibc_peer->ibp_nid);
305                 return;
306         }
307         
308         tx->tx_msg->ibm_u.completion.ibcm_status = status;
309         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
310         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
311         
312         kibnal_queue_tx(tx, conn);
313 }
314
315 void
316 kibnal_handle_rx (kib_rx_t *rx)
317 {
318         kib_msg_t    *msg = rx->rx_msg;
319         kib_conn_t   *conn = rx->rx_conn;
320         int           credits = msg->ibm_credits;
321         kib_tx_t     *tx;
322         int           rc;
323
324         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
325
326         CDEBUG (D_NET, "Received %x[%d] from "LPX64"\n",
327                 msg->ibm_type, credits, conn->ibc_peer->ibp_nid);
328         
329         if (credits != 0) {
330                 /* Have I received credits that will let me send? */
331                 spin_lock(&conn->ibc_lock);
332                 conn->ibc_credits += credits;
333                 spin_unlock(&conn->ibc_lock);
334
335                 kibnal_check_sends(conn);
336         }
337
338         switch (msg->ibm_type) {
339         default:
340                 CERROR("Bad IBNAL message type %x from "LPX64"\n",
341                        msg->ibm_type, conn->ibc_peer->ibp_nid);
342                 break;
343
344         case IBNAL_MSG_NOOP:
345                 break;
346
347         case IBNAL_MSG_IMMEDIATE:
348                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
349                 break;
350                 
351         case IBNAL_MSG_PUT_REQ:
352                 rx->rx_responded = 0;
353                 lib_parse(&kibnal_lib, &msg->ibm_u.putreq.ibprm_hdr, rx);
354                 if (rx->rx_responded)
355                         break;
356
357                 /* I wasn't asked to transfer any payload data.  This happens
358                  * if the PUT didn't match, or got truncated. */
359                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
360                                        msg->ibm_u.putreq.ibprm_cookie);
361                 break;
362
363         case IBNAL_MSG_PUT_NAK:
364                 CWARN ("PUT_NACK from "LPX64"\n", conn->ibc_peer->ibp_nid);
365                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
366                                          msg->ibm_u.completion.ibcm_status,
367                                          msg->ibm_u.completion.ibcm_cookie);
368                 break;
369
370         case IBNAL_MSG_PUT_ACK:
371                 spin_lock(&conn->ibc_lock);
372                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
373                                                    msg->ibm_u.putack.ibpam_src_cookie);
374                 if (tx != NULL)
375                         list_del(&tx->tx_list);
376                 spin_unlock(&conn->ibc_lock);
377
378                 if (tx == NULL) {
379                         CERROR("Unmatched PUT_ACK from "LPX64"\n",
380                                conn->ibc_peer->ibp_nid);
381                         kibnal_close_conn(conn, -EPROTO);
382                         break;
383                 }
384
385                 LASSERT (tx->tx_waiting);
386                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
387                  * (a) I can overwrite tx_msg since my peer has received it!
388                  * (b) tx_waiting set tells tx_complete() it's not done. */
389
390                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
391
392                 rc = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
393                                       kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
394                                       &msg->ibm_u.putack.ibpam_rd,
395                                       msg->ibm_u.putack.ibpam_dst_cookie);
396                 if (rc < 0)
397                         CERROR("Can't setup rdma for PUT to "LPX64": %d\n",
398                                conn->ibc_peer->ibp_nid, rc);
399
400                 spin_lock(&conn->ibc_lock);
401                 if (tx->tx_status == 0 && rc < 0)
402                         tx->tx_status = rc;
403                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
404                 kibnal_queue_tx_locked(tx, conn);
405                 spin_unlock(&conn->ibc_lock);
406                 break;
407                 
408         case IBNAL_MSG_PUT_DONE:
409                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
410                                          msg->ibm_u.completion.ibcm_status,
411                                          msg->ibm_u.completion.ibcm_cookie);
412                 break;
413
414         case IBNAL_MSG_GET_REQ:
415                 rx->rx_responded = 0;
416                 lib_parse(&kibnal_lib, &msg->ibm_u.get.ibgm_hdr, rx);
417                 if (rx->rx_responded)           /* I responded to the GET_REQ */
418                         break;
419                 /* NB GET didn't match (I'd have responded even with no payload
420                  * data) */
421                 kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, -ENODATA,
422                                        msg->ibm_u.get.ibgm_cookie);
423                 break;
424
425         case IBNAL_MSG_GET_DONE:
426                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
427                                          msg->ibm_u.completion.ibcm_status,
428                                          msg->ibm_u.completion.ibcm_cookie);
429                 break;
430         }
431
432         kibnal_post_rx(rx, 1);
433 }
434
435 void
436 kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob, __u64 rxseq)
437 {
438         kib_msg_t    *msg = rx->rx_msg;
439         kib_conn_t   *conn = rx->rx_conn;
440         unsigned long flags;
441         int           rc;
442
443         CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
444         LASSERT (rx->rx_posted);
445         rx->rx_posted = 0;
446
447         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
448                 goto ignore;
449
450         if (vvrc != vv_comp_status_success) {
451                 CERROR("Rx from "LPX64" failed: %d\n", 
452                        conn->ibc_peer->ibp_nid, vvrc);
453                 goto failed;
454         }
455
456         rc = kibnal_unpack_msg(msg, nob);
457         if (rc != 0) {
458                 CERROR ("Error %d unpacking rx from "LPX64"\n",
459                         rc, conn->ibc_peer->ibp_nid);
460                 goto failed;
461         }
462
463         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
464             msg->ibm_srcstamp != conn->ibc_incarnation ||
465             msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
466             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
467                 CERROR ("Stale rx from "LPX64"\n",
468                         conn->ibc_peer->ibp_nid);
469                 goto failed;
470         }
471
472         if (msg->ibm_seq != rxseq) {
473                 CERROR ("Out-of-sequence rx from "LPX64
474                         ": got "LPD64" but expected "LPD64"\n",
475                         conn->ibc_peer->ibp_nid, msg->ibm_seq, rxseq);
476                 goto failed;
477         }
478
479         /* racing with connection establishment/teardown! */
480
481         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
482                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
483                 /* must check holding global lock to eliminate race */
484                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
485                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
486                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
487                                                 flags);
488                         return;
489                 }
490                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
491                                         flags);
492         }
493         kibnal_handle_rx(rx);
494         return;
495         
496  failed:
497         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
498         kibnal_close_conn(conn, -EIO);
499  ignore:
500         /* Don't re-post rx & drop its ref on conn */
501         kibnal_conn_decref(conn);
502 }
503
504 struct page *
505 kibnal_kvaddr_to_page (unsigned long vaddr)
506 {
507         struct page *page;
508
509         if (vaddr >= VMALLOC_START &&
510             vaddr < VMALLOC_END) {
511                 page = vmalloc_to_page ((void *)vaddr);
512                 LASSERT (page != NULL);
513                 return page;
514         }
515 #if CONFIG_HIGHMEM
516         if (vaddr >= PKMAP_BASE &&
517             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
518                 /* No highmem pages only used for bulk (kiov) I/O */
519                 CERROR("find page for address in highmem\n");
520                 LBUG();
521         }
522 #endif
523         page = virt_to_page (vaddr);
524         LASSERT (page != NULL);
525         return page;
526 }
527
528 #if !IBNAL_USE_FMR
529 int
530 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
531                      unsigned long page_offset, unsigned long len)
532 {
533         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
534         vv_l_key_t       l_key;
535         vv_r_key_t       r_key;
536         __u64            addr;
537         __u64            frag_addr;
538         vv_mem_reg_h_t   mem_h;
539         vv_return_t      vvrc;
540
541         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
542                 CERROR ("Too many RDMA fragments\n");
543                 return -EMSGSIZE;
544         }
545
546         /* Try to create an address that adaptor-tavor will munge into a valid
547          * network address, given how it maps all phys mem into 1 region */
548         addr = kibnal_page2phys(page) + page_offset + PAGE_OFFSET;
549
550         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca, 
551                                     (void *)((unsigned long)addr),
552                                     len, &mem_h, &l_key, &r_key);
553         LASSERT (vvrc == vv_return_ok);
554
555         if (active) {
556                 if (rd->rd_nfrag == 0) {
557                         rd->rd_key = l_key;
558                 } else if (l_key != rd->rd_key) {
559                         CERROR ("> 1 key for single RDMA desc\n");
560                         return -EINVAL;
561                 }
562                 frag_addr = addr;
563         } else {
564                 if (rd->rd_nfrag == 0) {
565                         rd->rd_key = r_key;
566                 } else if (r_key != rd->rd_key) {
567                         CERROR ("> 1 key for single RDMA desc\n");
568                         return -EINVAL;
569                 }
570
571                 frag_addr = kibnal_addr2net(addr);
572         }
573
574         kibnal_rf_set(frag, frag_addr, len);
575
576         CDEBUG(D_NET,"map frag [%d][%d %x %08x%08x] "LPX64"\n", 
577                rd->rd_nfrag, frag->rf_nob, rd->rd_key, 
578                frag->rf_addr_hi, frag->rf_addr_lo, frag_addr);
579
580         rd->rd_nfrag++;
581         return 0;
582 }
583
584 int
585 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, 
586                     vv_access_con_bit_mask_t access,
587                     int niov, struct iovec *iov, int offset, int nob)
588                  
589 {
590         /* active if I'm sending */
591         int           active = ((access & vv_acc_r_mem_write) == 0);
592         int           fragnob;
593         int           rc;
594         unsigned long vaddr;
595         struct page  *page;
596         int           page_offset;
597
598         LASSERT (nob > 0);
599         LASSERT (niov > 0);
600         LASSERT ((rd != tx->tx_rd) == !active);
601
602         while (offset >= iov->iov_len) {
603                 offset -= iov->iov_len;
604                 niov--;
605                 iov++;
606                 LASSERT (niov > 0);
607         }
608
609         rd->rd_nfrag = 0;
610         do {
611                 LASSERT (niov > 0);
612
613                 vaddr = ((unsigned long)iov->iov_base) + offset;
614                 page_offset = vaddr & (PAGE_SIZE - 1);
615                 page = kibnal_kvaddr_to_page(vaddr);
616                 if (page == NULL) {
617                         CERROR ("Can't find page\n");
618                         return -EFAULT;
619                 }
620
621                 fragnob = min((int)(iov->iov_len - offset), nob);
622                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
623
624                 rc = kibnal_append_rdfrag(rd, active, page, 
625                                           page_offset, fragnob);
626                 if (rc != 0)
627                         return rc;
628
629                 if (offset + fragnob < iov->iov_len) {
630                         offset += fragnob;
631                 } else {
632                         offset = 0;
633                         iov++;
634                         niov--;
635                 }
636                 nob -= fragnob;
637         } while (nob > 0);
638         
639         return 0;
640 }
641
642 int
643 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, 
644                       vv_access_con_bit_mask_t access,
645                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
646 {
647         /* active if I'm sending */
648         int            active = ((access & vv_acc_r_mem_write) == 0);
649         int            fragnob;
650         int            rc;
651
652         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
653
654         LASSERT (nob > 0);
655         LASSERT (nkiov > 0);
656         LASSERT ((rd != tx->tx_rd) == !active);
657
658         while (offset >= kiov->kiov_len) {
659                 offset -= kiov->kiov_len;
660                 nkiov--;
661                 kiov++;
662                 LASSERT (nkiov > 0);
663         }
664
665         rd->rd_nfrag = 0;
666         do {
667                 LASSERT (nkiov > 0);
668                 fragnob = min((int)(kiov->kiov_len - offset), nob);
669                 
670                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
671                                           kiov->kiov_offset + offset,
672                                           fragnob);
673                 if (rc != 0)
674                         return rc;
675
676                 offset = 0;
677                 kiov++;
678                 nkiov--;
679                 nob -= fragnob;
680         } while (nob > 0);
681
682         return 0;
683 }
684 #else
685 int
686 kibnal_map_tx (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
687                int npages, unsigned long page_offset, int nob)
688 {
689         vv_return_t   vvrc;
690         vv_fmr_map_t  map_props;
691
692         LASSERT ((rd != tx->tx_rd) == !active);
693         LASSERT (!tx->tx_md.md_active);
694         LASSERT (tx->tx_md.md_fmrcount > 0);
695         LASSERT (page_offset < PAGE_SIZE);
696         LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
697         LASSERT (npages <= PTL_MD_MAX_IOV);
698
699         memset(&map_props, 0, sizeof(map_props));
700
701         map_props.start          = (void *)page_offset;
702         map_props.size           = nob;
703         map_props.page_array_len = npages;
704         map_props.page_array     = tx->tx_pages;
705
706         vvrc = vv_map_fmr(kibnal_data.kib_hca, tx->tx_md.md_fmrhandle,
707                           &map_props, &tx->tx_md.md_lkey, &tx->tx_md.md_rkey);
708         if (vvrc != vv_return_ok) {
709                 CERROR ("Can't map vaddr %p for %d in %d pages: %d\n", 
710                         map_props.start, nob, npages, vvrc);
711                 return -EFAULT;
712         }
713
714         tx->tx_md.md_addr = (unsigned long)map_props.start;
715         tx->tx_md.md_active = 1;
716         tx->tx_md.md_fmrcount--;
717
718         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
719         rd->rd_nob = nob;
720         rd->rd_addr = tx->tx_md.md_addr;
721
722         /* Compensate for adaptor-tavor's munging of gatherlist addresses */
723         if (active)
724                 rd->rd_addr += PAGE_OFFSET;
725
726         return 0;
727 }
728
729 int
730 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
731                      vv_access_con_bit_mask_t access,
732                      int niov, struct iovec *iov, int offset, int nob)
733                  
734 {
735         /* active if I'm sending */
736         int           active = ((access & vv_acc_r_mem_write) == 0);
737         int           resid;
738         int           fragnob;
739         struct page  *page;
740         int           npages;
741         unsigned long page_offset;
742         unsigned long vaddr;
743         
744         LASSERT (nob > 0);
745         LASSERT (niov > 0);
746
747         while (offset >= iov->iov_len) {
748                 offset -= iov->iov_len;
749                 niov--;
750                 iov++;
751                 LASSERT (niov > 0);
752         }
753
754         if (nob > iov->iov_len - offset) {
755                 CERROR ("Can't map multiple vaddr fragments\n");
756                 return (-EMSGSIZE);
757         }
758
759         vaddr = ((unsigned long)iov->iov_base) + offset;
760         
761         page_offset = vaddr & (PAGE_SIZE - 1);
762         resid = nob;
763         npages = 0;
764
765         do {
766                 LASSERT (npages < PTL_MD_MAX_IOV);
767
768                 page = kibnal_kvaddr_to_page(vaddr);
769                 if (page == NULL) {
770                         CERROR("Can't find page for %lu\n", vaddr);
771                         return -EFAULT;
772                 }
773
774                 tx->tx_pages[npages++] = kibnal_page2phys(page);
775
776                 fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
777                 vaddr += fragnob;
778                 resid -= fragnob;
779
780         } while (resid > 0);
781
782         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
783 }
784
785 int
786 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
787                       vv_access_con_bit_mask_t access,
788                       int nkiov, ptl_kiov_t *kiov, int offset, int nob)
789 {
790         /* active if I'm sending */
791         int            active = ((access & vv_acc_r_mem_write) == 0);
792         int            resid;
793         int            npages;
794         unsigned long  page_offset;
795         
796         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
797
798         LASSERT (nob > 0);
799         LASSERT (nkiov > 0);
800         LASSERT (nkiov <= PTL_MD_MAX_IOV);
801         LASSERT (!tx->tx_md.md_active);
802         LASSERT ((rd != tx->tx_rd) == !active);
803
804         while (offset >= kiov->kiov_len) {
805                 offset -= kiov->kiov_len;
806                 nkiov--;
807                 kiov++;
808                 LASSERT (nkiov > 0);
809         }
810
811         page_offset = kiov->kiov_offset + offset;
812         
813         resid = offset + nob;
814         npages = 0;
815
816         do {
817                 LASSERT (npages < PTL_MD_MAX_IOV);
818                 LASSERT (nkiov > 0);
819
820                 if ((npages > 0 && kiov->kiov_offset != 0) ||
821                     (resid > kiov->kiov_len && 
822                      (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
823                         /* Can't have gaps */
824                         CERROR ("Can't make payload contiguous in I/O VM:"
825                                 "page %d, offset %d, len %d \n",
826                                 npages, kiov->kiov_offset, kiov->kiov_len);
827                         
828                         return -EINVAL;
829                 }
830
831                 tx->tx_pages[npages++] = kibnal_page2phys(kiov->kiov_page);
832                 resid -= kiov->kiov_len;
833                 kiov++;
834                 nkiov--;
835         } while (resid > 0);
836
837         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
838 }
839 #endif
840
841 kib_conn_t *
842 kibnal_find_conn_locked (kib_peer_t *peer)
843 {
844         struct list_head *tmp;
845
846         /* just return the first connection */
847         list_for_each (tmp, &peer->ibp_conns) {
848                 return (list_entry(tmp, kib_conn_t, ibc_list));
849         }
850
851         return (NULL);
852 }
853
854 void
855 kibnal_check_sends (kib_conn_t *conn)
856 {
857         kib_tx_t       *tx;
858         vv_return_t     vvrc;                        
859         int             rc;
860         int             done;
861
862         /* Don't send anything until after the connection is established */
863         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
864                 CDEBUG(D_NET, LPX64"too soon\n", conn->ibc_peer->ibp_nid);
865                 return;
866         }
867         
868         spin_lock(&conn->ibc_lock);
869
870         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
871
872         if (list_empty(&conn->ibc_tx_queue) &&
873             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
874                 spin_unlock(&conn->ibc_lock);
875                 
876                 tx = kibnal_get_idle_tx(0);     /* don't block */
877                 if (tx != NULL)
878                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
879
880                 spin_lock(&conn->ibc_lock);
881                 
882                 if (tx != NULL)
883                         kibnal_queue_tx_locked(tx, conn);
884         }
885
886         while (!list_empty (&conn->ibc_tx_queue)) {
887                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
888
889                 LASSERT (tx->tx_queued);
890                 /* We rely on this for QP sizing */
891                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
892
893                 LASSERT (conn->ibc_outstanding_credits >= 0);
894                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
895                 LASSERT (conn->ibc_credits >= 0);
896                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
897
898                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE) {
899                         CDEBUG(D_NET, LPX64": posted enough\n",
900                                conn->ibc_peer->ibp_nid);
901                         break;
902                 }
903                 
904                 if (conn->ibc_credits == 0) {   /* no credits */
905                         CDEBUG(D_NET, LPX64": no credits\n",
906                                conn->ibc_peer->ibp_nid);
907                         break;
908                 }
909                 
910                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
911                     conn->ibc_outstanding_credits == 0) { /* giving back credits */
912                         CDEBUG(D_NET, LPX64": not using last credit\n",
913                                conn->ibc_peer->ibp_nid);
914                         break;
915                 }
916                 
917                 list_del (&tx->tx_list);
918                 tx->tx_queued = 0;
919
920                 /* NB don't drop ibc_lock before bumping tx_sending */
921
922                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
923                     (!list_empty(&conn->ibc_tx_queue) ||
924                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
925                         /* redundant NOOP */
926                         spin_unlock(&conn->ibc_lock);
927                         kibnal_tx_done(tx);
928                         spin_lock(&conn->ibc_lock);
929                         CDEBUG(D_NET, LPX64": redundant noop\n",
930                                conn->ibc_peer->ibp_nid);
931                         continue;
932                 }
933
934                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
935                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
936                                 conn->ibc_txseq);
937
938                 conn->ibc_txseq++;
939                 conn->ibc_outstanding_credits = 0;
940                 conn->ibc_nsends_posted++;
941                 conn->ibc_credits--;
942
943                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
944                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
945                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
946                  * and then re-queued here.  It's (just) possible that
947                  * tx_sending is non-zero if we've not done the tx_complete() from
948                  * the first send; hence the ++ rather than = below. */
949                 tx->tx_sending++;
950
951                 list_add (&tx->tx_list, &conn->ibc_active_txs);
952
953                 /* Keep holding ibc_lock while posting sends on this
954                  * connection; vv_post_send() isn't re-entrant on the same
955                  * QP!! */
956
957                 LASSERT (tx->tx_nwrq > 0);
958 #if 0
959                 if (tx->tx_wrq[0].wr_type == vv_wr_rdma_write) 
960                         CDEBUG(D_WARNING, "WORK[0]: RDMA gl %p for %d k %x -> "LPX64" k %x\n",
961                                tx->tx_wrq[0].scatgat_list->v_address,
962                                tx->tx_wrq[0].scatgat_list->length,
963                                tx->tx_wrq[0].scatgat_list->l_key,
964                                tx->tx_wrq[0].type.send.send_qp_type.rc_type.r_addr,
965                                tx->tx_wrq[0].type.send.send_qp_type.rc_type.r_r_key);
966                 else
967                         CDEBUG(D_WARNING, "WORK[0]: %s gl %p for %d k %x\n",
968                                tx->tx_wrq[0].wr_type == vv_wr_send ? "SEND" : "????",
969                                tx->tx_wrq[0].scatgat_list->v_address,
970                                tx->tx_wrq[0].scatgat_list->length,
971                                tx->tx_wrq[0].scatgat_list->l_key);
972
973                 if (tx->tx_nwrq > 1) {
974                         if (tx->tx_wrq[1].wr_type == vv_wr_rdma_write) 
975                                 CDEBUG(D_WARNING, "WORK[1]: RDMA gl %p for %d k %x -> "LPX64" k %x\n",
976                                        tx->tx_wrq[1].scatgat_list->v_address,
977                                        tx->tx_wrq[1].scatgat_list->length,
978                                        tx->tx_wrq[1].scatgat_list->l_key,
979                                        tx->tx_wrq[1].type.send.send_qp_type.rc_type.r_addr,
980                                        tx->tx_wrq[1].type.send.send_qp_type.rc_type.r_r_key);
981                         else
982                                 CDEBUG(D_WARNING, "WORK[1]: %s gl %p for %d k %x\n",
983                                        tx->tx_wrq[1].wr_type == vv_wr_send ? "SEND" : "????",
984                                        tx->tx_wrq[1].scatgat_list->v_address,
985                                        tx->tx_wrq[1].scatgat_list->length,
986                                        tx->tx_wrq[1].scatgat_list->l_key);
987                 }
988 #endif           
989                 rc = -ECONNABORTED;
990                 vvrc = vv_return_ok;
991                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
992                         tx->tx_status = 0;
993                         vvrc = vv_post_send_list(kibnal_data.kib_hca,
994                                                  conn->ibc_qp,
995                                                  tx->tx_nwrq,
996                                                  tx->tx_wrq,
997                                                  vv_operation_type_send_rc);
998                         rc = (vvrc == vv_return_ok) ? 0 : -EIO;
999                 }
1000
1001                 if (rc != 0) {
1002                         /* NB credits are transferred in the actual
1003                          * message, which can only be the last work item */
1004                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
1005                         conn->ibc_credits++;
1006                         conn->ibc_nsends_posted--;
1007
1008                         tx->tx_status = rc;
1009                         tx->tx_waiting = 0;
1010                         tx->tx_sending--;
1011                         
1012                         done = (tx->tx_sending == 0);
1013                         if (done)
1014                                 list_del (&tx->tx_list);
1015                         
1016                         spin_unlock(&conn->ibc_lock);
1017                         
1018                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1019                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
1020                                         vvrc, conn->ibc_peer->ibp_nid);
1021                         else
1022                                 CDEBUG (D_NET, "Error %d posting transmit to "
1023                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1024
1025                         kibnal_close_conn (conn, rc);
1026
1027                         if (done)
1028                                 kibnal_tx_done (tx);
1029                         return;
1030                 }
1031         }
1032
1033         spin_unlock(&conn->ibc_lock);
1034 }
1035
1036 void
1037 kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
1038 {
1039         kib_conn_t   *conn = tx->tx_conn;
1040         int           failed = (vvrc != vv_comp_status_success);
1041         int           idle;
1042
1043         CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n", 
1044                tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);
1045
1046         LASSERT (tx->tx_sending > 0);
1047
1048         if (failed &&
1049             tx->tx_status == 0 &&
1050             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1051                 CERROR("tx -> "LPX64" type %x cookie "LPX64
1052                        "sending %d waiting %d: failed %d\n", 
1053                        conn->ibc_peer->ibp_nid, tx->tx_msg->ibm_type, 
1054                        tx->tx_cookie, tx->tx_sending, tx->tx_waiting, vvrc);
1055
1056         spin_lock(&conn->ibc_lock);
1057
1058         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1059          * gets to free it, which also drops its ref on 'conn'. */
1060
1061         tx->tx_sending--;
1062         conn->ibc_nsends_posted--;
1063
1064         if (failed) {
1065                 tx->tx_waiting = 0;
1066                 tx->tx_status = -EIO;
1067         }
1068         
1069         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1070                !tx->tx_waiting &&               /* Not waiting for peer */
1071                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1072         if (idle)
1073                 list_del(&tx->tx_list);
1074
1075         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1076
1077         spin_unlock(&conn->ibc_lock);
1078
1079         if (idle)
1080                 kibnal_tx_done (tx);
1081
1082         if (failed)
1083                 kibnal_close_conn (conn, -EIO);
1084         else
1085                 kibnal_check_sends(conn);
1086
1087         kibnal_conn_decref(conn);               /* ...until here */
1088 }
1089
1090 void
1091 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1092 {
1093         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];
1094         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];
1095         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1096         __u64         addr = (__u64)((unsigned long)((tx)->tx_msg));
1097
1098         LASSERT (tx->tx_nwrq >= 0 && 
1099                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1100         LASSERT (nob <= IBNAL_MSG_SIZE);
1101
1102         kibnal_init_msg(tx->tx_msg, type, body_nob);
1103
1104         *gl = (vv_scatgat_t) {
1105                 .v_address = KIBNAL_ADDR2SG(addr),
1106                 .l_key     = tx->tx_lkey,
1107                 .length    = nob,
1108         };
1109
1110         memset(wrq, 0, sizeof(*wrq));
1111
1112         wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1113         wrq->wr_type = vv_wr_send;
1114         wrq->scatgat_list = gl;
1115         wrq->num_of_data_segments = 1;
1116         wrq->completion_notification = 1;
1117         wrq->type.send.solicited_event = 1;
1118         wrq->type.send.immidiate_data_indicator = 0;
1119         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1120         
1121         tx->tx_nwrq++;
1122 }
1123
1124 int
1125 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1126                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1127 {
1128         kib_msg_t       *ibmsg = tx->tx_msg;
1129         kib_rdma_desc_t *srcrd = tx->tx_rd;
1130         vv_scatgat_t    *gl;
1131         vv_wr_t         *wrq;
1132         int              rc;
1133
1134 #if IBNAL_USE_FMR
1135         LASSERT (tx->tx_nwrq == 0);
1136
1137         gl = &tx->tx_gl[0];
1138         gl->length    = nob;
1139         gl->v_address = KIBNAL_ADDR2SG(srcrd->rd_addr);
1140         gl->l_key     = srcrd->rd_key;
1141
1142         wrq = &tx->tx_wrq[0];
1143
1144         wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1145         wrq->completion_notification = 0;
1146         wrq->scatgat_list = gl;
1147         wrq->num_of_data_segments = 1;
1148         wrq->wr_type = vv_wr_rdma_write;
1149         wrq->type.send.solicited_event = 0;
1150         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1151         wrq->type.send.send_qp_type.rc_type.r_addr = dstrd->rd_addr;
1152         wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1153
1154         tx->tx_nwrq = 1;
1155         rc = nob;
1156 #else
1157         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1158         int              resid = nob;
1159         kib_rdma_frag_t *srcfrag;
1160         int              srcidx;
1161         kib_rdma_frag_t *dstfrag;
1162         int              dstidx;
1163         int              wrknob;
1164
1165         /* Called by scheduler */
1166         LASSERT (!in_interrupt());
1167
1168         LASSERT (type == IBNAL_MSG_GET_DONE ||
1169                  type == IBNAL_MSG_PUT_DONE);
1170
1171         srcidx = dstidx = 0;
1172         srcfrag = &srcrd->rd_frags[0];
1173         dstfrag = &dstrd->rd_frags[0];
1174         rc = resid;
1175
1176         while (resid > 0) {
1177                 if (srcidx >= srcrd->rd_nfrag) {
1178                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1179                         rc = -EPROTO;
1180                         break;
1181                 }
1182                 
1183                 if (dstidx == dstrd->rd_nfrag) {
1184                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1185                         rc = -EPROTO;
1186                         break;
1187                 }
1188
1189                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1190                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1191                                srcidx, srcrd->rd_nfrag,
1192                                dstidx, dstrd->rd_nfrag);
1193                         rc = -EMSGSIZE;
1194                         break;
1195                 }
1196
1197                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1198
1199                 gl = &tx->tx_gl[tx->tx_nwrq];
1200                 gl->v_address = KIBNAL_ADDR2SG(kibnal_rf_addr(srcfrag));
1201                 gl->length    = wrknob;
1202                 gl->l_key     = srcrd->rd_key;
1203
1204                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1205
1206                 wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1207                 wrq->completion_notification = 0;
1208                 wrq->scatgat_list = gl;
1209                 wrq->num_of_data_segments = 1;
1210                 wrq->wr_type = vv_wr_rdma_write;
1211                 wrq->type.send.solicited_event = 0;
1212                 wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1213                 wrq->type.send.send_qp_type.rc_type.r_addr = kibnal_rf_addr(dstfrag);
1214                 wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1215
1216                 resid -= wrknob;
1217                 if (wrknob < srcfrag->rf_nob) {
1218                         kibnal_rf_set(srcfrag, 
1219                                       kibnal_rf_addr(srcfrag) + wrknob, 
1220                                       srcfrag->rf_nob - wrknob);
1221                 } else {
1222                         srcfrag++;
1223                         srcidx++;
1224                 }
1225                 
1226                 if (wrknob < dstfrag->rf_nob) {
1227                         kibnal_rf_set(dstfrag,
1228                                       kibnal_rf_addr(dstfrag) + wrknob,
1229                                       dstfrag->rf_nob - wrknob);
1230                 } else {
1231                         dstfrag++;
1232                         dstidx++;
1233                 }
1234                 
1235                 tx->tx_nwrq++;
1236         }
1237
1238         if (rc < 0)                             /* no RDMA if completing with failure */
1239                 tx->tx_nwrq = 0;
1240 #endif
1241         
1242         ibmsg->ibm_u.completion.ibcm_status = rc;
1243         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1244         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1245
1246         return rc;
1247 }
1248
1249 void
1250 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1251 {
1252         spin_lock(&conn->ibc_lock);
1253         kibnal_queue_tx_locked (tx, conn);
1254         spin_unlock(&conn->ibc_lock);
1255         
1256         kibnal_check_sends(conn);
1257 }
1258
1259 void
1260 kibnal_schedule_peer_arp (kib_peer_t *peer)
1261 {
1262         unsigned long flags;
1263
1264         LASSERT (peer->ibp_connecting != 0);
1265         LASSERT (peer->ibp_arp_count > 0);
1266
1267         kibnal_peer_addref(peer); /* extra ref for connd */
1268
1269         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1270
1271         list_add_tail (&peer->ibp_connd_list, &kibnal_data.kib_connd_peers);
1272         wake_up (&kibnal_data.kib_connd_waitq);
1273
1274         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1275 }
1276
1277 void
1278 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1279 {
1280         kib_peer_t      *peer;
1281         kib_conn_t      *conn;
1282         unsigned long    flags;
1283         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1284
1285         /* If I get here, I've committed to send, so I complete the tx with
1286          * failure on any problems */
1287         
1288         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1289         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1290
1291         read_lock_irqsave(g_lock, flags);
1292         
1293         peer = kibnal_find_peer_locked (nid);
1294         if (peer == NULL) {
1295                 read_unlock_irqrestore(g_lock, flags);
1296                 tx->tx_status = -EHOSTUNREACH;
1297                 tx->tx_waiting = 0;
1298                 kibnal_tx_done (tx);
1299                 return;
1300         }
1301
1302         conn = kibnal_find_conn_locked (peer);
1303         if (conn != NULL) {
1304                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1305                 read_unlock_irqrestore(g_lock, flags);
1306                 
1307                 kibnal_queue_tx (tx, conn);
1308                 kibnal_conn_decref(conn);       /* ...to here */
1309                 return;
1310         }
1311         
1312         /* Making one or more connections; I'll need a write lock... */
1313         read_unlock(g_lock);
1314         write_lock(g_lock);
1315
1316         peer = kibnal_find_peer_locked (nid);
1317         if (peer == NULL) {
1318                 write_unlock_irqrestore(g_lock, flags);
1319                 tx->tx_status = -EHOSTUNREACH;
1320                 tx->tx_waiting = 0;
1321                 kibnal_tx_done (tx);
1322                 return;
1323         }
1324
1325         conn = kibnal_find_conn_locked (peer);
1326         if (conn != NULL) {
1327                 /* Connection exists; queue message on it */
1328                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1329                 write_unlock_irqrestore(g_lock, flags);
1330                 
1331                 kibnal_queue_tx (tx, conn);
1332                 kibnal_conn_decref(conn);       /* ...until here */
1333                 return;
1334         }
1335
1336         if (peer->ibp_connecting == 0) {
1337                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1338                         write_unlock_irqrestore(g_lock, flags);
1339                         tx->tx_status = -EHOSTUNREACH;
1340                         tx->tx_waiting = 0;
1341                         kibnal_tx_done (tx);
1342                         return;
1343                 }
1344         
1345                 peer->ibp_connecting = 1;
1346                 peer->ibp_arp_count = 1 + IBNAL_ARP_RETRIES;
1347                 kibnal_schedule_peer_arp(peer);
1348         }
1349         
1350         /* A connection is being established; queue the message... */
1351         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1352
1353         write_unlock_irqrestore(g_lock, flags);
1354 }
1355
1356 int
1357 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
1358 {
1359         /* I would guess that if kibnal_get_peer (nid) == NULL,
1360            and we're not routing, then 'nid' is very distant :) */
1361         if ( nal->libnal_ni.ni_pid.nid == nid ) {
1362                 *dist = 0;
1363         } else {
1364                 *dist = 1;
1365         }
1366
1367         return 0;
1368 }
1369
1370 ptl_err_t
1371 kibnal_sendmsg(lib_nal_t    *nal, 
1372                void         *private,
1373                lib_msg_t    *libmsg,
1374                ptl_hdr_t    *hdr, 
1375                int           type, 
1376                ptl_nid_t     nid, 
1377                ptl_pid_t     pid,
1378                unsigned int  payload_niov, 
1379                struct iovec *payload_iov, 
1380                ptl_kiov_t   *payload_kiov,
1381                int           payload_offset,
1382                int           payload_nob)
1383 {
1384         kib_msg_t  *ibmsg;
1385         kib_tx_t   *tx;
1386         int         nob;
1387         int         rc;
1388
1389         /* NB 'private' is different depending on what we're sending.... */
1390
1391         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64
1392                " pid %d\n", payload_nob, payload_niov, nid , pid);
1393
1394         LASSERT (payload_nob == 0 || payload_niov > 0);
1395         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1396
1397         /* Thread context */
1398         LASSERT (!in_interrupt());
1399         /* payload is either all vaddrs or all pages */
1400         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1401
1402         switch (type) {
1403         default:
1404                 LBUG();
1405                 return (PTL_FAIL);
1406                 
1407         case PTL_MSG_REPLY: {
1408                 /* reply's 'private' is the incoming receive */
1409                 kib_rx_t *rx = private;
1410
1411                 LASSERT(rx != NULL);
1412
1413                 if (rx->rx_msg->ibm_type == IBNAL_MSG_IMMEDIATE) {
1414                         /* RDMA not expected */
1415                         nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1416                         if (nob > IBNAL_MSG_SIZE) {
1417                                 CERROR("REPLY for "LPX64" too big (RDMA not requested):"
1418                                        "%d (max for message is %d)\n", 
1419                                        nid, payload_nob, IBNAL_MSG_SIZE);
1420                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
1421                                        nob, nid);
1422                                 return PTL_FAIL;
1423                         }
1424                         break;
1425                 }
1426
1427                 /* Incoming message consistent with RDMA? */
1428                 if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
1429                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
1430                                nid, rx->rx_msg->ibm_type);
1431                         return PTL_FAIL;
1432                 }
1433
1434                 /* NB rx_complete() will send GET_NAK when I return to it from
1435                  * here, unless I set rx_responded! */
1436
1437                 tx = kibnal_get_idle_tx(0);
1438                 if (tx == NULL) {
1439                         CERROR("Can't get tx for REPLY to "LPX64"\n", nid);
1440                         return PTL_FAIL;
1441                 }
1442
1443                 if (payload_nob == 0)
1444                         rc = 0;
1445                 else if (payload_kiov == NULL)
1446                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
1447                                                  payload_niov, payload_iov, 
1448                                                  payload_offset, payload_nob);
1449                 else
1450                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1451                                                   payload_niov, payload_kiov,
1452                                                   payload_offset, payload_nob);
1453                 if (rc != 0) {
1454                         CERROR("Can't setup GET src for "LPX64": %d\n", nid, rc);
1455                         kibnal_tx_done(tx);
1456                         return PTL_FAIL;
1457                 }
1458                 
1459                 rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, payload_nob,
1460                                       &rx->rx_msg->ibm_u.get.ibgm_rd,
1461                                       rx->rx_msg->ibm_u.get.ibgm_cookie);
1462                 if (rc < 0) {
1463                         CERROR("Can't setup rdma for GET from "LPX64": %d\n", 
1464                                nid, rc);
1465                 } else if (rc == 0) {
1466                         /* No RDMA: local completion may happen now! */
1467                         lib_finalize (&kibnal_lib, NULL, libmsg, PTL_OK);
1468                 } else {
1469                         /* RDMA: lib_finalize(libmsg) when it completes */
1470                         tx->tx_libmsg[0] = libmsg;
1471                 }
1472
1473                 kibnal_queue_tx(tx, rx->rx_conn);
1474                 rx->rx_responded = 1;
1475                 return (rc >= 0) ? PTL_OK : PTL_FAIL;
1476         }
1477
1478         case PTL_MSG_GET:
1479                 /* will the REPLY message be small enough not to need RDMA? */
1480                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1481                 if (nob <= IBNAL_MSG_SIZE)
1482                         break;
1483
1484                 tx = kibnal_get_idle_tx(1);     /* may block; caller is an app thread */
1485                 LASSERT (tx != NULL);
1486
1487                 ibmsg = tx->tx_msg;
1488                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1489                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1490
1491                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
1492                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1493                                                  vv_acc_r_mem_write,
1494                                                  libmsg->md->md_niov,
1495                                                  libmsg->md->md_iov.iov,
1496                                                  0, libmsg->md->length);
1497                 else
1498                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1499                                                   vv_acc_r_mem_write,
1500                                                   libmsg->md->md_niov,
1501                                                   libmsg->md->md_iov.kiov,
1502                                                   0, libmsg->md->length);
1503                 if (rc != 0) {
1504                         CERROR("Can't setup GET sink for "LPX64": %d\n", nid, rc);
1505                         kibnal_tx_done(tx);
1506                         return PTL_FAIL;
1507                 }
1508
1509 #if IBNAL_USE_FMR
1510                 nob = sizeof(kib_get_msg_t);
1511 #else
1512                 {
1513                         int n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1514                         
1515                         nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1516                 }
1517 #endif
1518                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1519
1520                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, nid, libmsg);
1521                 if (tx->tx_libmsg[1] == NULL) {
1522                         CERROR("Can't create reply for GET -> "LPX64"\n", nid);
1523                         kibnal_tx_done(tx);
1524                         return PTL_FAIL;
1525                 }
1526
1527                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg[0,1] on completion */
1528                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1529                 kibnal_launch_tx(tx, nid);
1530                 return PTL_OK;
1531
1532         case PTL_MSG_ACK:
1533                 LASSERT (payload_nob == 0);
1534                 break;
1535
1536         case PTL_MSG_PUT:
1537                 /* Is the payload small enough not to need RDMA? */
1538                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1539                 if (nob <= IBNAL_MSG_SIZE)
1540                         break;
1541
1542                 tx = kibnal_get_idle_tx(1);     /* may block: caller is app thread */
1543                 LASSERT (tx != NULL);
1544
1545                 if (payload_kiov == NULL)
1546                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
1547                                                  payload_niov, payload_iov,
1548                                                  payload_offset, payload_nob);
1549                 else
1550                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1551                                                   payload_niov, payload_kiov,
1552                                                   payload_offset, payload_nob);
1553                 if (rc != 0) {
1554                         CERROR("Can't setup PUT src for "LPX64": %d\n", nid, rc);
1555                         kibnal_tx_done(tx);
1556                         return PTL_FAIL;
1557                 }
1558
1559                 ibmsg = tx->tx_msg;
1560                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1561                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1562                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1563
1564                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1565                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1566                 kibnal_launch_tx(tx, nid);
1567                 return PTL_OK;
1568         }
1569
1570         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1571                  <= IBNAL_MSG_SIZE);
1572
1573         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1574                                   type == PTL_MSG_REPLY));
1575         if (tx == NULL) {
1576                 CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", type, nid);
1577                 return PTL_NO_SPACE;
1578         }
1579
1580         ibmsg = tx->tx_msg;
1581         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1582
1583         if (payload_nob > 0) {
1584                 if (payload_kiov != NULL)
1585                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1586                                           payload_niov, payload_kiov,
1587                                           payload_offset, payload_nob);
1588                 else
1589                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1590                                          payload_niov, payload_iov,
1591                                          payload_offset, payload_nob);
1592         }
1593
1594         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1595         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1596
1597         tx->tx_libmsg[0] = libmsg;              /* finalise libmsg on completion */
1598         kibnal_launch_tx(tx, nid);
1599         return PTL_OK;
1600 }
1601
1602 ptl_err_t
1603 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1604                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1605                unsigned int payload_niov, struct iovec *payload_iov,
1606                size_t payload_offset, size_t payload_len)
1607 {
1608         CDEBUG(D_NET, "  pid = %d, nid="LPU64"\n",
1609                pid, nid);
1610         return (kibnal_sendmsg(nal, private, cookie,
1611                                hdr, type, nid, pid,
1612                                payload_niov, payload_iov, NULL,
1613                                payload_offset, payload_len));
1614 }
1615
1616 ptl_err_t
1617 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1618                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1619                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1620                      size_t payload_offset, size_t payload_len)
1621 {
1622         return (kibnal_sendmsg(nal, private, cookie,
1623                                hdr, type, nid, pid,
1624                                payload_niov, NULL, payload_kiov,
1625                                payload_offset, payload_len));
1626 }
1627
1628 ptl_err_t
1629 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1630                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1631                  size_t offset, int mlen, int rlen)
1632 {
1633         kib_rx_t    *rx = private;
1634         kib_msg_t   *rxmsg = rx->rx_msg;
1635         kib_conn_t  *conn = rx->rx_conn;
1636         kib_tx_t    *tx;
1637         kib_msg_t   *txmsg;
1638         int          nob;
1639         int          rc;
1640         
1641         LASSERT (mlen <= rlen);
1642         LASSERT (mlen >= 0);
1643         LASSERT (!in_interrupt());
1644         /* Either all pages or all vaddrs */
1645         LASSERT (!(kiov != NULL && iov != NULL));
1646
1647         switch (rxmsg->ibm_type) {
1648         default:
1649                 LBUG();
1650                 
1651         case IBNAL_MSG_IMMEDIATE:
1652                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1653                 if (nob > IBNAL_MSG_SIZE) {
1654                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1655                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1656                         return (PTL_FAIL);
1657                 }
1658
1659                 if (kiov != NULL)
1660                         lib_copy_buf2kiov(niov, kiov, offset,
1661                                           rxmsg->ibm_u.immediate.ibim_payload,
1662                                           mlen);
1663                 else
1664                         lib_copy_buf2iov(niov, iov, offset,
1665                                          rxmsg->ibm_u.immediate.ibim_payload,
1666                                          mlen);
1667
1668                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1669                 return (PTL_OK);
1670
1671         case IBNAL_MSG_PUT_REQ:
1672                 /* NB rx_complete() will send PUT_NAK when I return to it from
1673                  * here, unless I set rx_responded!  */
1674
1675                 if (mlen == 0) { /* No payload to RDMA */
1676                         lib_finalize(nal, NULL, libmsg, PTL_OK);
1677                         return PTL_OK;
1678                 }
1679
1680                 tx = kibnal_get_idle_tx(0);
1681                 if (tx == NULL) {
1682                         CERROR("Can't allocate tx for "LPX64"\n",
1683                                conn->ibc_peer->ibp_nid);
1684                         return PTL_FAIL;
1685                 }
1686
1687                 txmsg = tx->tx_msg;
1688                 if (kiov == NULL)
1689                         rc = kibnal_setup_rd_iov(tx, 
1690                                                  &txmsg->ibm_u.putack.ibpam_rd,
1691                                                  vv_acc_r_mem_write,
1692                                                  niov, iov, offset, mlen);
1693                 else
1694                         rc = kibnal_setup_rd_kiov(tx,
1695                                                   &txmsg->ibm_u.putack.ibpam_rd,
1696                                                   vv_acc_r_mem_write,
1697                                                   niov, kiov, offset, mlen);
1698                 if (rc != 0) {
1699                         CERROR("Can't setup PUT sink for "LPX64": %d\n",
1700                                conn->ibc_peer->ibp_nid, rc);
1701                         kibnal_tx_done(tx);
1702                         return PTL_FAIL;
1703                 }
1704
1705                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1706                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1707 #if IBNAL_USE_FMR
1708                 nob = sizeof(kib_putack_msg_t);
1709 #else
1710                 {
1711                         int n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1712
1713                         nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1714                 }
1715 #endif
1716                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1717
1718                 tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
1719                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1720                 kibnal_queue_tx(tx, conn);
1721
1722                 LASSERT (!rx->rx_responded);
1723                 rx->rx_responded = 1;
1724                 return PTL_OK;
1725
1726         case IBNAL_MSG_GET_REQ:
1727                 /* We get called here just to discard any junk after the
1728                  * GET hdr. */
1729                 LASSERT (libmsg == NULL);
1730                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1731                 return (PTL_OK);
1732         }
1733 }
1734
1735 ptl_err_t
1736 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1737               unsigned int niov, struct iovec *iov, 
1738               size_t offset, size_t mlen, size_t rlen)
1739 {
1740         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1741                                 offset, mlen, rlen));
1742 }
1743
1744 ptl_err_t
1745 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1746                      unsigned int niov, ptl_kiov_t *kiov, 
1747                      size_t offset, size_t mlen, size_t rlen)
1748 {
1749         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1750                                 offset, mlen, rlen));
1751 }
1752
1753 int
1754 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1755 {
1756         long    pid = kernel_thread (fn, arg, 0);
1757
1758         if (pid < 0)
1759                 return ((int)pid);
1760
1761         atomic_inc (&kibnal_data.kib_nthreads);
1762         return (0);
1763 }
1764
1765 void
1766 kibnal_thread_fini (void)
1767 {
1768         atomic_dec (&kibnal_data.kib_nthreads);
1769 }
1770
1771 void
1772 kibnal_schedule_conn (kib_conn_t *conn)
1773 {
1774         unsigned long flags;
1775
1776         kibnal_conn_addref(conn);               /* ++ref for connd */
1777         
1778         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1779
1780         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1781         wake_up (&kibnal_data.kib_connd_waitq);
1782                 
1783         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1784 }
1785
1786 void
1787 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1788 {
1789         /* This just does the immmediate housekeeping.  'error' is zero for a
1790          * normal shutdown which can happen only after the connection has been
1791          * established.  If the connection is established, schedule the
1792          * connection to be finished off by the connd.  Otherwise the connd is
1793          * already dealing with it (either to set it up or tear it down).
1794          * Caller holds kib_global_lock exclusively in irq context */
1795         kib_peer_t       *peer = conn->ibc_peer;
1796         
1797         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1798
1799         if (error != 0 && conn->ibc_comms_error == 0)
1800                 conn->ibc_comms_error = error;
1801
1802         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1803                 return; /* already being handled  */
1804         
1805         /* NB Can't take ibc_lock here (could be in IRQ context), without
1806          * risking deadlock, so access to ibc_{tx_queue,active_txs} is racey */
1807
1808         if (error == 0 &&
1809             list_empty(&conn->ibc_tx_queue) &&
1810             list_empty(&conn->ibc_active_txs)) {
1811                 CDEBUG(D_NET, "closing conn to "LPX64
1812                        " rx# "LPD64" tx# "LPD64"\n", 
1813                        peer->ibp_nid, conn->ibc_txseq, conn->ibc_rxseq);
1814         } else {
1815                 CERROR("Closing conn to "LPX64": error %d%s%s"
1816                        " rx# "LPD64" tx# "LPD64"\n",
1817                        peer->ibp_nid, error,
1818                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1819                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
1820                        conn->ibc_txseq, conn->ibc_rxseq);
1821
1822 #if 0
1823                 /* can't skip down the queue without holding ibc_lock (see above) */
1824                 list_for_each(tmp, &conn->ibc_tx_queue) {
1825                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1826                         
1827                         CERROR("   queued tx type %x cookie "LPX64
1828                                " sending %d waiting %d ticks %ld/%d\n", 
1829                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1830                                tx->tx_sending, tx->tx_waiting,
1831                                (long)(tx->tx_deadline - jiffies), HZ);
1832                 }
1833
1834                 list_for_each(tmp, &conn->ibc_active_txs) {
1835                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1836                         
1837                         CERROR("   active tx type %x cookie "LPX64
1838                                " sending %d waiting %d ticks %ld/%d\n", 
1839                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1840                                tx->tx_sending, tx->tx_waiting,
1841                                (long)(tx->tx_deadline - jiffies), HZ);
1842                 }
1843 #endif
1844         }
1845
1846         list_del (&conn->ibc_list);
1847         
1848         if (list_empty (&peer->ibp_conns) &&    /* no more conns */
1849             peer->ibp_persistence == 0 &&       /* non-persistent peer */
1850             kibnal_peer_active(peer)) {         /* still in peer table */
1851                 kibnal_unlink_peer_locked (peer);
1852         }
1853
1854         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECT1);
1855
1856         kibnal_schedule_conn(conn);
1857         kibnal_conn_decref(conn);               /* lose ibc_list's ref */
1858 }
1859
1860 void
1861 kibnal_close_conn (kib_conn_t *conn, int error)
1862 {
1863         unsigned long flags;
1864         
1865         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1866
1867         kibnal_close_conn_locked (conn, error);
1868         
1869         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1870 }
1871
1872 void
1873 kibnal_handle_early_rxs(kib_conn_t *conn)
1874 {
1875         unsigned long    flags;
1876         kib_rx_t        *rx;
1877
1878         LASSERT (!in_interrupt());
1879         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1880         
1881         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1882         while (!list_empty(&conn->ibc_early_rxs)) {
1883                 rx = list_entry(conn->ibc_early_rxs.next,
1884                                 kib_rx_t, rx_list);
1885                 list_del(&rx->rx_list);
1886                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1887                 
1888                 kibnal_handle_rx(rx);
1889                 
1890                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1891         }
1892         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1893 }
1894
1895 void
1896 kibnal_conn_disconnected(kib_conn_t *conn)
1897 {
1898         LIST_HEAD        (zombies); 
1899         struct list_head *tmp;
1900         struct list_head *nxt;
1901         kib_tx_t         *tx;
1902
1903         /* I'm the connd */
1904         LASSERT (!in_interrupt());
1905         LASSERT (current == kibnal_data.kib_connd);
1906         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
1907         
1908         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
1909
1910         /* move QP to error state to make posted work items complete */
1911         kibnal_set_qp_state(conn, vv_qp_state_error);
1912
1913         spin_lock(&conn->ibc_lock);
1914
1915         /* Complete all tx descs not waiting for sends to complete.
1916          * NB we should be safe from RDMA now that the QP has changed state */
1917
1918         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
1919                 tx = list_entry (tmp, kib_tx_t, tx_list);
1920
1921                 LASSERT (tx->tx_queued);
1922
1923                 tx->tx_status = -ECONNABORTED;
1924                 tx->tx_queued = 0;
1925                 tx->tx_waiting = 0;
1926                 
1927                 if (tx->tx_sending != 0)
1928                         continue;
1929
1930                 list_del (&tx->tx_list);
1931                 list_add (&tx->tx_list, &zombies);
1932         }
1933
1934         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
1935                 tx = list_entry (tmp, kib_tx_t, tx_list);
1936
1937                 LASSERT (!tx->tx_queued);
1938                 LASSERT (tx->tx_waiting ||
1939                          tx->tx_sending != 0);
1940
1941                 tx->tx_status = -ECONNABORTED;
1942                 tx->tx_waiting = 0;
1943                 
1944                 if (tx->tx_sending != 0)
1945                         continue;
1946
1947                 list_del (&tx->tx_list);
1948                 list_add (&tx->tx_list, &zombies);
1949         }
1950         
1951         spin_unlock(&conn->ibc_lock);
1952
1953         while (!list_empty(&zombies)) {
1954                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1955
1956                 list_del(&tx->tx_list);
1957                 kibnal_tx_done (tx);
1958         }
1959
1960         kibnal_handle_early_rxs(conn);
1961 }
1962
1963 void
1964 kibnal_peer_connect_failed (kib_peer_t *peer, int active)
1965 {
1966         struct list_head  zombies;
1967         kib_tx_t         *tx;
1968         unsigned long     flags;
1969
1970         /* Only the connd creates conns => single threaded */
1971         LASSERT (!in_interrupt());
1972         LASSERT (current == kibnal_data.kib_connd);
1973         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1974
1975         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1976
1977         if (active) {
1978                 LASSERT (peer->ibp_connecting != 0);
1979                 peer->ibp_connecting--;
1980         } else {
1981                 LASSERT (!kibnal_peer_active(peer));
1982         }
1983         
1984         if (peer->ibp_connecting != 0) {
1985                 /* another connection attempt under way (loopback?)... */
1986                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1987                 return;
1988         }
1989
1990         if (list_empty(&peer->ibp_conns)) {
1991                 /* Say when active connection can be re-attempted */
1992                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1993                 /* Increase reconnection interval */
1994                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1995                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1996         
1997                 /* Take peer's blocked transmits to complete with error */
1998                 list_add(&zombies, &peer->ibp_tx_queue);
1999                 list_del_init(&peer->ibp_tx_queue);
2000                 
2001                 if (kibnal_peer_active(peer) &&
2002                     (peer->ibp_persistence == 0)) {
2003                         /* failed connection attempt on non-persistent peer */
2004                         kibnal_unlink_peer_locked (peer);
2005                 }
2006         } else {
2007                 /* Can't have blocked transmits if there are connections */
2008                 LASSERT (list_empty(&peer->ibp_tx_queue));
2009         }
2010         
2011         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2012
2013         if (list_empty (&zombies)) 
2014                 return;
2015         
2016         CERROR ("Deleting messages for "LPX64": connection failed\n", peer->ibp_nid);
2017         do {
2018                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
2019
2020                 list_del (&tx->tx_list);
2021                 /* complete now */
2022                 tx->tx_status = -EHOSTUNREACH;
2023                 kibnal_tx_done (tx);
2024         } while (!list_empty (&zombies));
2025 }
2026
2027 void
2028 kibnal_connreq_done(kib_conn_t *conn, int active, int status)
2029 {
2030         static cm_reject_data_t   rej;
2031
2032         struct list_head   txs;
2033         kib_peer_t        *peer = conn->ibc_peer;
2034         kib_peer_t        *peer2;
2035         unsigned long      flags;
2036         kib_tx_t          *tx;
2037
2038         /* Only the connd creates conns => single threaded */
2039         LASSERT (!in_interrupt());
2040         LASSERT (current == kibnal_data.kib_connd);
2041         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
2042
2043         if (active) {
2044                 LASSERT (peer->ibp_connecting > 0);
2045         } else {
2046                 LASSERT (!kibnal_peer_active(peer));
2047         }
2048         
2049         PORTAL_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2050         conn->ibc_connvars = NULL;
2051
2052         if (status != 0) {
2053                 /* failed to establish connection */
2054                 switch (conn->ibc_state) {
2055                 default:
2056                         LBUG();
2057
2058                 case IBNAL_CONN_ACTIVE_CHECK_REPLY:
2059                         /* got a connection reply but failed checks */
2060                         LASSERT (active);
2061                         memset(&rej, 0, sizeof(rej));
2062                         rej.reason = cm_rej_code_usr_rej;
2063                         cm_reject(conn->ibc_cep, &rej);
2064                         break;
2065
2066                 case IBNAL_CONN_ACTIVE_CONNECT:
2067                         LASSERT (active);
2068                         cm_cancel(conn->ibc_cep);
2069                         kibnal_pause(HZ/10);
2070                         /* cm_connect() failed immediately or
2071                          * callback returned failure */
2072                         break;
2073
2074                 case IBNAL_CONN_ACTIVE_ARP:
2075                         LASSERT (active);
2076                         /* ibat_get_ib_data() failed immediately 
2077                          * or callback returned failure */
2078                         break;
2079
2080                 case IBNAL_CONN_INIT:
2081                         break;
2082
2083                 case IBNAL_CONN_PASSIVE_WAIT:
2084                         LASSERT (!active);
2085                         /* cm_accept callback returned failure */
2086                         break;
2087                 }
2088
2089                 kibnal_peer_connect_failed(conn->ibc_peer, active);
2090                 kibnal_conn_disconnected(conn);
2091                 return;
2092         }
2093
2094         /* connection established */
2095         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2096
2097         if (active) {
2098                 LASSERT(conn->ibc_state == IBNAL_CONN_ACTIVE_RTU);
2099         } else {
2100                 LASSERT(conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2101         }
2102         
2103         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2104
2105         if (!active) {
2106                 peer2 = kibnal_find_peer_locked(peer->ibp_nid);
2107                 if (peer2 != NULL) {
2108                         /* already in the peer table; swap */
2109                         conn->ibc_peer = peer2;
2110                         kibnal_peer_addref(peer2);
2111                         kibnal_peer_decref(peer);
2112                         peer = conn->ibc_peer;
2113                 } else {
2114                         /* add 'peer' to the peer table */
2115                         kibnal_peer_addref(peer);
2116                         list_add_tail(&peer->ibp_list,
2117                                       kibnal_nid2peerlist(peer->ibp_nid));
2118                 }
2119         }
2120         
2121         /* Add conn to peer's list and nuke any dangling conns from a different
2122          * peer instance... */
2123         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2124         list_add(&conn->ibc_list, &peer->ibp_conns);
2125         kibnal_close_stale_conns_locked (conn->ibc_peer,
2126                                          conn->ibc_incarnation);
2127
2128         if (!kibnal_peer_active(peer) ||        /* peer has been deleted */
2129             conn->ibc_comms_error != 0 ||       /* comms error */
2130             conn->ibc_disconnect) {             /* need to disconnect */
2131                 
2132                 /* start to shut down connection */
2133                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2134
2135                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2136                 kibnal_peer_connect_failed(peer, active);
2137                 return;
2138         }
2139
2140         if (active)
2141                 peer->ibp_connecting--;
2142
2143         /* grab pending txs while I have the lock */
2144         list_add(&txs, &peer->ibp_tx_queue);
2145         list_del_init(&peer->ibp_tx_queue);
2146         
2147         /* reset reconnect interval for next attempt */
2148         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
2149         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2150
2151         /* Schedule blocked txs */
2152         spin_lock (&conn->ibc_lock);
2153         while (!list_empty (&txs)) {
2154                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2155                 list_del (&tx->tx_list);
2156
2157                 kibnal_queue_tx_locked (tx, conn);
2158         }
2159         spin_unlock (&conn->ibc_lock);
2160         kibnal_check_sends (conn);
2161
2162         /* schedule blocked rxs */
2163         kibnal_handle_early_rxs(conn);
2164 }
2165
2166 void
2167 kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
2168 {
2169         static cm_dreply_data_t drep;           /* just zeroed space */
2170         
2171         kib_conn_t             *conn = (kib_conn_t *)arg;
2172         unsigned long           flags;
2173         
2174         /* CAVEAT EMPTOR: tasklet context */
2175
2176         switch (cmdata->status) {
2177         default:
2178                 LBUG();
2179                 
2180         case cm_event_disconn_request:
2181                 /* IBNAL_CONN_ACTIVE_RTU:  gets closed in kibnal_connreq_done
2182                  * IBNAL_CONN_ESTABLISHED: I start it closing
2183                  * otherwise:              it's closing anyway */
2184                 cm_disconnect(conn->ibc_cep, NULL, &drep);
2185                 cm_cancel(conn->ibc_cep);
2186
2187                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2188                 LASSERT (!conn->ibc_disconnect);
2189                 conn->ibc_disconnect = 1;
2190
2191                 switch (conn->ibc_state) {
2192                 default:
2193                         LBUG();
2194
2195                 case IBNAL_CONN_ACTIVE_RTU:
2196                         /* kibnal_connreq_done is getting there; It'll see
2197                          * ibc_disconnect set... */
2198                         break;
2199
2200                 case IBNAL_CONN_ESTABLISHED:
2201                         /* kibnal_connreq_done got there already; get
2202                          * disconnect going... */
2203                         kibnal_close_conn_locked(conn, 0);
2204                         break;
2205
2206                 case IBNAL_CONN_DISCONNECT1:
2207                         /* kibnal_terminate_conn is getting there; It'll see
2208                          * ibc_disconnect set... */
2209                         break;
2210
2211                 case IBNAL_CONN_DISCONNECT2:
2212                         /* kibnal_terminate_conn got there already; complete
2213                          * the disconnect. */
2214                         kibnal_schedule_conn(conn);
2215                         break;
2216                 }
2217                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2218                 break;
2219                 
2220         case cm_event_disconn_timeout:
2221         case cm_event_disconn_reply:
2222                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2223                 LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT2);
2224                 LASSERT (!conn->ibc_disconnect);
2225                 conn->ibc_disconnect = 1;
2226
2227                 /* kibnal_terminate_conn sent the disconnect request. */
2228                 kibnal_schedule_conn(conn);
2229
2230                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2231                 break;
2232                 
2233         case cm_event_connected:
2234         case cm_event_conn_timeout:
2235         case cm_event_conn_reject:
2236                 LASSERT (conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2237                 conn->ibc_connvars->cv_conndata = *cmdata;
2238
2239                 kibnal_schedule_conn(conn);
2240                 break;
2241         }
2242
2243         kibnal_conn_decref(conn); /* lose my ref */
2244 }
2245
2246 void
2247 kibnal_check_passive_wait(kib_conn_t *conn)
2248 {
2249         int     rc;
2250
2251         switch (conn->ibc_connvars->cv_conndata.status) {
2252         default:
2253                 LBUG();
2254                 
2255         case cm_event_connected:
2256                 kibnal_conn_addref(conn); /* ++ ref for CM callback */
2257                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2258                 if (rc != 0)
2259                         conn->ibc_comms_error = rc;
2260                 /* connection _has_ been established; it's just that we've had
2261                  * an error immediately... */
2262                 kibnal_connreq_done(conn, 0, 0);
2263                 break;
2264                 
2265         case cm_event_conn_timeout:
2266                 kibnal_connreq_done(conn, 0, -ETIMEDOUT);
2267                 break;
2268                 
2269         case cm_event_conn_reject:
2270                 kibnal_connreq_done(conn, 0, -ECONNRESET);
2271                 break;
2272         }
2273 }
2274
2275 void
2276 kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
2277 {
2278         static kib_msg_t        txmsg;
2279         static kib_msg_t        rxmsg;
2280         static cm_reply_data_t  reply;
2281         static cm_reject_data_t reject;
2282
2283         kib_conn_t         *conn = NULL;
2284         int                 rc = 0;
2285         int                 rxmsgnob;
2286         kib_connvars_t     *cv;
2287         kib_peer_t         *tmp_peer;
2288         cm_return_t         cmrc;
2289         vv_return_t         vvrc;
2290         
2291         /* I'm the connd executing in thread context
2292          * No concurrency problems with static data! */
2293         LASSERT (!in_interrupt());
2294         LASSERT (current == kibnal_data.kib_connd);
2295
2296         if (cmreq->sid != IBNAL_SERVICE_NUMBER) {
2297                 CERROR(LPX64" != IBNAL_SERVICE_NUMBER("LPX64")\n",
2298                        cmreq->sid, (__u64)IBNAL_SERVICE_NUMBER);
2299                 goto reject;
2300         }
2301
2302         /* copy into rxmsg to avoid alignment issues */
2303         rxmsgnob = MIN(cm_REQ_priv_data_len, sizeof(rxmsg));
2304         memcpy(&rxmsg, cmreq->priv_data, rxmsgnob);
2305
2306         rc = kibnal_unpack_msg(&rxmsg, rxmsgnob);
2307         if (rc != 0) {
2308                 CERROR("Can't parse connection request: %d\n", rc);
2309                 goto reject;
2310         }
2311
2312         if (rxmsg.ibm_type != IBNAL_MSG_CONNREQ) {
2313                 CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
2314                        rxmsg.ibm_type, rxmsg.ibm_srcnid);
2315                 goto reject;
2316         }
2317
2318         if (rxmsg.ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) {
2319                 CERROR("Can't accept "LPX64": bad dst nid "LPX64"\n",
2320                        rxmsg.ibm_srcnid, rxmsg.ibm_dstnid);
2321                 goto reject;
2322         }
2323
2324         if (rxmsg.ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2325                 CERROR("Can't accept "LPX64": incompatible queue depth %d (%d wanted)\n",
2326                        rxmsg.ibm_srcnid, rxmsg.ibm_u.connparams.ibcp_queue_depth, 
2327                        IBNAL_MSG_QUEUE_SIZE);
2328                 goto reject;
2329         }
2330
2331         if (rxmsg.ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2332                 CERROR("Can't accept "LPX64": message size %d too big (%d max)\n",
2333                        rxmsg.ibm_srcnid, rxmsg.ibm_u.connparams.ibcp_max_msg_size, 
2334                        IBNAL_MSG_SIZE);
2335                 goto reject;
2336         }
2337                 
2338         if (rxmsg.ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2339                 CERROR("Can't accept "LPX64": max frags %d too big (%d max)\n",
2340                        rxmsg.ibm_srcnid, rxmsg.ibm_u.connparams.ibcp_max_frags, 
2341                        IBNAL_MAX_RDMA_FRAGS);
2342                 goto reject;
2343         }
2344                 
2345         conn = kibnal_create_conn(cep);
2346         if (conn == NULL) {
2347                 CERROR("Can't create conn for "LPX64"\n", rxmsg.ibm_srcnid);
2348                 goto reject;
2349         }
2350         
2351         /* assume 'rxmsg.ibm_srcnid' is a new peer */
2352         tmp_peer = kibnal_create_peer (rxmsg.ibm_srcnid);
2353         if (tmp_peer == NULL) {
2354                 CERROR("Can't create tmp peer for "LPX64"\n", rxmsg.ibm_srcnid);
2355                 kibnal_conn_decref(conn);
2356                 conn = NULL;
2357                 goto reject;
2358         }
2359
2360         conn->ibc_peer = tmp_peer;              /* conn takes over my ref */
2361         conn->ibc_incarnation = rxmsg.ibm_srcstamp;
2362         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2363
2364         cv = conn->ibc_connvars;
2365
2366         cv->cv_txpsn          = cmreq->cep_data.start_psn;
2367         cv->cv_remote_qpn     = cmreq->cep_data.qpn;
2368         cv->cv_path           = cmreq->path_data.path;
2369         cv->cv_rnr_count      = cmreq->cep_data.rtr_retry_cnt;
2370         // XXX                  cmreq->cep_data.retry_cnt;
2371         cv->cv_port           = cmreq->cep_data.local_port_num;
2372
2373         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2374                              &cv->cv_path.sgid, &cv->cv_sgid_index);
2375         LASSERT (vvrc == vv_return_ok);
2376         
2377         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2378                                cv->cv_path.pkey, &cv->cv_pkey_index);
2379         LASSERT (vvrc == vv_return_ok);
2380
2381         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2382         if (rc != 0)
2383                 goto reject;
2384
2385         rc = kibnal_post_receives(conn);
2386         if (rc != 0) {
2387                 CERROR("Can't post receives for "LPX64"\n", rxmsg.ibm_srcnid);
2388                 goto reject;
2389         }
2390
2391         rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2392         if (rc != 0)
2393                 goto reject;
2394         
2395         memset(&reply, 0, sizeof(reply));
2396         reply.qpn                 = cv->cv_local_qpn;
2397         reply.qkey                = IBNAL_QKEY;
2398         reply.start_psn           = cv->cv_rxpsn;
2399         reply.arb_initiator_depth = IBNAL_ARB_INITIATOR_DEPTH;
2400         reply.arb_resp_res        = IBNAL_ARB_RESP_RES;
2401         reply.failover_accepted   = IBNAL_FAILOVER_ACCEPTED;
2402         reply.rnr_retry_count     = cv->cv_rnr_count;
2403         reply.targ_ack_delay      = kibnal_data.kib_hca_attrs.ack_delay;
2404         
2405         /* setup txmsg... */
2406         memset(&txmsg, 0, sizeof(txmsg));
2407         kibnal_init_msg(&txmsg, IBNAL_MSG_CONNACK, 
2408                         sizeof(txmsg.ibm_u.connparams));
2409         LASSERT (txmsg.ibm_nob <= cm_REP_priv_data_len);
2410         txmsg.ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2411         txmsg.ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2412         txmsg.ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2413         kibnal_pack_msg(&txmsg, 0, rxmsg.ibm_srcnid, rxmsg.ibm_srcstamp, 0);
2414
2415         /* ...and copy into reply to avoid alignment issues */
2416         memcpy(&reply.priv_data, &txmsg, txmsg.ibm_nob);
2417
2418         kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
2419         
2420         cmrc = cm_accept(conn->ibc_cep, &reply, NULL,
2421                          kibnal_cm_callback, conn);
2422
2423         if (cmrc == cm_stat_success)
2424                 return;                         /* callback has got my ref on conn */
2425
2426         /* back out state change (no callback happening) */
2427         kibnal_set_conn_state(conn, IBNAL_CONN_INIT);
2428         rc = -EIO;
2429                 
2430  reject:
2431         CERROR("Rejected connreq from "LPX64"\n", rxmsg.ibm_srcnid);
2432
2433         memset(&reject, 0, sizeof(reject));
2434         reject.reason = cm_rej_code_usr_rej;
2435         cm_reject(cep, &reject);
2436
2437         if (conn != NULL) {
2438                 LASSERT (rc != 0);
2439                 kibnal_connreq_done(conn, 0, rc);
2440         } else {
2441                 cm_destroy_cep(cep);
2442         }
2443 }
2444
2445 void
2446 kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *data, void *arg)
2447 {
2448         cm_request_data_t  *cmreq = &data->data.request;
2449         kib_pcreq_t        *pcr;
2450         unsigned long       flags;
2451         
2452         LASSERT (arg == NULL);
2453
2454         if (data->status != cm_event_conn_request) {
2455                 CERROR("status %d is not cm_event_conn_request\n",
2456                        data->status);
2457                 return;
2458         }
2459
2460         PORTAL_ALLOC_ATOMIC(pcr, sizeof(*pcr));
2461         if (pcr == NULL) {
2462                 CERROR("Can't allocate passive connreq\n");
2463
2464                 cm_reject(cep, &((cm_reject_data_t) /* NB RO struct */
2465                                  {.reason = cm_rej_code_no_res,}));
2466                 cm_destroy_cep(cep);
2467                 return;
2468         }
2469
2470         pcr->pcr_cep = cep;
2471         pcr->pcr_cmreq = *cmreq;
2472         
2473         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2474
2475         list_add_tail(&pcr->pcr_list, &kibnal_data.kib_connd_pcreqs);
2476         wake_up(&kibnal_data.kib_connd_waitq);
2477         
2478         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2479 }
2480
2481
2482 void
2483 kibnal_active_connect_callback (cm_cep_handle_t cep, cm_conn_data_t *cd, 
2484                                 void *arg)
2485 {
2486         /* CAVEAT EMPTOR: tasklet context */
2487         kib_conn_t       *conn = (kib_conn_t *)arg;
2488         kib_connvars_t   *cv = conn->ibc_connvars;
2489
2490         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2491         cv->cv_conndata = *cd;
2492
2493         kibnal_schedule_conn(conn);
2494         kibnal_conn_decref(conn);
2495 }
2496
2497 void
2498 kibnal_connect_conn (kib_conn_t *conn)
2499 {
2500         static cm_request_data_t  cmreq;
2501         static kib_msg_t          msg;
2502         
2503         kib_connvars_t           *cv = conn->ibc_connvars;
2504         kib_peer_t               *peer = conn->ibc_peer;
2505         cm_return_t               cmrc;
2506         
2507         /* Only called by connd => statics OK */
2508         LASSERT (!in_interrupt());
2509         LASSERT (current == kibnal_data.kib_connd);
2510         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2511
2512         memset(&cmreq, 0, sizeof(cmreq));
2513         
2514         cmreq.sid = IBNAL_SERVICE_NUMBER;
2515
2516         cmreq.cep_data.ca_guid              = kibnal_data.kib_hca_attrs.guid;
2517         cmreq.cep_data.qpn                  = cv->cv_local_qpn;
2518         cmreq.cep_data.retry_cnt            = IBNAL_RETRY_CNT;
2519         cmreq.cep_data.rtr_retry_cnt        = IBNAL_RNR_CNT;
2520         cmreq.cep_data.start_psn            = cv->cv_rxpsn;
2521         cmreq.cep_data.end_to_end_flow_ctrl = IBNAL_EE_FLOW_CNT;
2522         // XXX ack_timeout?
2523         // offered_resp_res
2524         // offered_initiator_depth
2525
2526         cmreq.path_data.subn_local  = IBNAL_LOCAL_SUB;
2527         cmreq.path_data.path        = cv->cv_path;
2528         
2529         /* setup msg... */
2530         memset(&msg, 0, sizeof(msg));
2531         kibnal_init_msg(&msg, IBNAL_MSG_CONNREQ, sizeof(msg.ibm_u.connparams));
2532         LASSERT(msg.ibm_nob <= cm_REQ_priv_data_len);
2533         msg.ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2534         msg.ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2535         msg.ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2536         kibnal_pack_msg(&msg, 0, peer->ibp_nid, 0, 0);
2537
2538         /* ...and copy into cmreq to avoid alignment issues */
2539         memcpy(&cmreq.priv_data, &msg, msg.ibm_nob);
2540         
2541         CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
2542
2543         kibnal_conn_addref(conn);               /* ++ref for CM callback */
2544         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CONNECT);
2545
2546         cmrc = cm_connect(conn->ibc_cep, &cmreq, 
2547                           kibnal_active_connect_callback, conn);
2548         if (cmrc == cm_stat_success) {
2549                 CDEBUG(D_NET, "connection REQ sent to "LPX64"\n",
2550                        peer->ibp_nid);
2551                 return;
2552         }
2553
2554         CERROR ("Connect "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2555         kibnal_conn_decref(conn);       /* drop callback's ref */
2556         kibnal_connreq_done(conn, 1, -EHOSTUNREACH);
2557 }
2558
2559 void
2560 kibnal_check_connreply (kib_conn_t *conn)
2561 {
2562         static cm_rtu_data_t  rtu;
2563         static kib_msg_t      msg;
2564
2565         kib_connvars_t   *cv = conn->ibc_connvars;
2566         cm_reply_data_t  *reply = &cv->cv_conndata.data.reply;
2567         kib_peer_t       *peer = conn->ibc_peer;
2568         int               msgnob;
2569         cm_return_t       cmrc;
2570         cm_cep_handle_t   cep;
2571         unsigned long     flags;
2572         int               rc;
2573
2574         /* Only called by connd => statics OK */
2575         LASSERT (!in_interrupt());
2576         LASSERT (current == kibnal_data.kib_connd);
2577         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2578
2579         if (cv->cv_conndata.status == cm_event_conn_reply) {
2580                 cv->cv_remote_qpn = reply->qpn;
2581                 cv->cv_txpsn      = reply->start_psn;
2582                 // XXX              reply->targ_ack_delay;
2583                 cv->cv_rnr_count  = reply->rnr_retry_count;
2584
2585                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2586
2587                 /* copy into msg to avoid alignment issues */
2588                 msgnob = MIN(cm_REP_priv_data_len, sizeof(msg));
2589                 memcpy(&msg, &reply->priv_data, msgnob);
2590
2591                 rc = kibnal_unpack_msg(&msg, msgnob);
2592                 if (rc != 0) {
2593                         CERROR("Can't unpack reply from "LPX64"\n",
2594                                peer->ibp_nid);
2595                         kibnal_connreq_done(conn, 1, rc);
2596                         return;
2597                 }
2598
2599                 if (msg.ibm_type != IBNAL_MSG_CONNACK ) {
2600                         CERROR("Unexpected message type %d from "LPX64"\n",
2601                                msg.ibm_type, peer->ibp_nid);
2602                         kibnal_connreq_done(conn, 1, -EPROTO);
2603                         return;
2604                 }
2605
2606                 if (msg.ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2607                         CERROR(LPX64" has incompatible queue depth %d(%d wanted)\n",
2608                                peer->ibp_nid, msg.ibm_u.connparams.ibcp_queue_depth,
2609                                IBNAL_MSG_QUEUE_SIZE);
2610                         kibnal_connreq_done(conn, 1, -EPROTO);
2611                         return;
2612                 }
2613                 
2614                 if (msg.ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2615                         CERROR(LPX64" max message size %d too big (%d max)\n",
2616                                peer->ibp_nid, msg.ibm_u.connparams.ibcp_max_msg_size, 
2617                                IBNAL_MSG_SIZE);
2618                         kibnal_connreq_done(conn, 1, -EPROTO);
2619                         return;
2620                 }
2621
2622                 if (msg.ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2623                         CERROR(LPX64" max frags %d too big (%d max)\n",
2624                                peer->ibp_nid, msg.ibm_u.connparams.ibcp_max_frags, 
2625                                IBNAL_MAX_RDMA_FRAGS);
2626                         kibnal_connreq_done(conn, 1, -EPROTO);
2627                         return;
2628                 }
2629                 
2630                 read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2631                 rc = (msg.ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
2632                       msg.ibm_dststamp != kibnal_data.kib_incarnation) ?
2633                      -ESTALE : 0;
2634                 read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2635                 if (rc != 0) {
2636                         CERROR("Stale connection reply from "LPX64"\n",
2637                                peer->ibp_nid);
2638                         kibnal_connreq_done(conn, 1, rc);
2639                         return;
2640                 }
2641
2642                 conn->ibc_incarnation = msg.ibm_srcstamp;
2643                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2644                 
2645                 rc = kibnal_post_receives(conn);
2646                 if (rc != 0) {
2647                         CERROR("Can't post receives for "LPX64"\n",
2648                                peer->ibp_nid);
2649                         kibnal_connreq_done(conn, 1, rc);
2650                         return;
2651                 }
2652                 
2653                 rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2654                 if (rc != 0) {
2655                         kibnal_connreq_done(conn, 1, rc);
2656                         return;
2657                 }
2658                 
2659                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2660                 if (rc != 0) {
2661                         kibnal_connreq_done(conn, 1, rc);
2662                         return;
2663                 }
2664                 
2665                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_RTU);
2666                 kibnal_conn_addref(conn);       /* ++for CM callback */
2667                 
2668                 memset(&rtu, 0, sizeof(rtu));
2669                 cmrc = cm_accept(conn->ibc_cep, NULL, &rtu,
2670                                  kibnal_cm_callback, conn);
2671                 if (cmrc == cm_stat_success) {
2672                         /* Now I'm racing with disconnect signalled by
2673                          * kibnal_cm_callback */
2674                         kibnal_connreq_done(conn, 1, 0);
2675                         return;
2676                 }
2677
2678                 CERROR("cm_accept "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
2679                 /* Back out of RTU: no callback coming */
2680                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2681                 kibnal_conn_decref(conn);
2682                 kibnal_connreq_done(conn, 1, -EIO);
2683                 return;
2684         }
2685
2686         if (cv->cv_conndata.status == cm_event_conn_reject) {
2687
2688                 if (cv->cv_conndata.data.reject.reason != cm_rej_code_stale_conn) {
2689                         CERROR("conn -> "LPX64" rejected: %d\n", peer->ibp_nid,
2690                                cv->cv_conndata.data.reject.reason);
2691                         kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2692                         return;
2693                 }
2694
2695                 CWARN ("conn -> "LPX64" stale: retrying\n", peer->ibp_nid);
2696
2697                 cep = cm_create_cep(cm_cep_transp_rc);
2698                 if (cep == NULL) {
2699                         CERROR("Can't create new CEP\n");
2700                         kibnal_connreq_done(conn, 1, -ENOMEM);
2701                         return;
2702                 }
2703
2704                 cmrc = cm_cancel(conn->ibc_cep);
2705                 LASSERT (cmrc == cm_stat_success);
2706                 cmrc = cm_destroy_cep(conn->ibc_cep);
2707                 LASSERT (cmrc == cm_stat_success);
2708
2709                 conn->ibc_cep = cep;
2710
2711                 /* retry connect */
2712                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2713                 kibnal_connect_conn(conn);
2714                 return;
2715         }
2716
2717         CERROR("conn -> "LPX64" failed: %d\n", peer->ibp_nid,
2718                cv->cv_conndata.status);
2719         kibnal_connreq_done(conn, 1, -ECONNABORTED);
2720 }
2721
2722 void
2723 kibnal_arp_done (kib_conn_t *conn)
2724 {
2725         kib_peer_t           *peer = conn->ibc_peer;
2726         kib_connvars_t       *cv = conn->ibc_connvars;
2727         ibat_arp_data_t      *arp = &cv->cv_arp;
2728         ib_path_record_v2_t  *path = &cv->cv_path;
2729         vv_return_t           vvrc;
2730         int                   rc;
2731         unsigned long         flags;
2732
2733         LASSERT (!in_interrupt());
2734         LASSERT (current == kibnal_data.kib_connd);
2735         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2736         LASSERT (peer->ibp_arp_count > 0);
2737         
2738         if (cv->cv_arprc != ibat_stat_ok) {
2739                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2740                 peer->ibp_arp_count--;
2741                 if (peer->ibp_arp_count == 0) {
2742                         /* final ARP attempt failed */
2743                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2744                                                 flags);
2745                         CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n", 
2746                                peer->ibp_nid, HIPQUAD(peer->ibp_ip), 
2747                                cv->cv_arprc);
2748                 } else {
2749                         /* Retry ARP: ibp_connecting++ so terminating conn
2750                          * doesn't end peer's connection attempt */
2751                         peer->ibp_connecting++;
2752                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2753                                                 flags);
2754                         CWARN("Arp "LPX64"@%u.%u.%u.%u failed: %d "
2755                               "(%d attempts left)\n", 
2756                               peer->ibp_nid, HIPQUAD(peer->ibp_ip), 
2757                               cv->cv_arprc, peer->ibp_arp_count);
2758
2759                         kibnal_schedule_peer_arp(peer);
2760                 }
2761                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2762                 return;
2763         }
2764
2765         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
2766                 CDEBUG(D_NET, "Got valid path for "LPX64"\n", peer->ibp_nid);
2767
2768                 *path = *arp->primary_path;
2769
2770                 vvrc = base_gid2port_num(kibnal_data.kib_hca, &path->sgid,
2771                                          &cv->cv_port);
2772                 LASSERT (vvrc == vv_return_ok);
2773
2774                 vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2775                                      &path->sgid, &cv->cv_sgid_index);
2776                 LASSERT (vvrc == vv_return_ok);
2777
2778                 vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2779                                        path->pkey, &cv->cv_pkey_index);
2780                 LASSERT (vvrc == vv_return_ok);
2781
2782                 path->mtu = IBNAL_IB_MTU;
2783
2784         } else if ((arp->mask & IBAT_LID_VALID) != 0) {
2785                 CWARN("Creating new path record for "LPX64"@%u.%u.%u.%u\n",
2786                       peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2787
2788                 cv->cv_pkey_index = IBNAL_PKEY_IDX;
2789                 cv->cv_sgid_index = IBNAL_SGID_IDX;
2790                 cv->cv_port = arp->local_port_num;
2791
2792                 memset(path, 0, sizeof(*path));
2793
2794                 vvrc = port_num2base_gid(kibnal_data.kib_hca, cv->cv_port,
2795                                          &path->sgid);
2796                 LASSERT (vvrc == vv_return_ok);
2797
2798                 vvrc = port_num2base_lid(kibnal_data.kib_hca, cv->cv_port,
2799                                          &path->slid);
2800                 LASSERT (vvrc == vv_return_ok);
2801
2802                 path->dgid          = arp->gid;
2803                 path->sl            = IBNAL_SERVICE_LEVEL;
2804                 path->dlid          = arp->lid;
2805                 path->mtu           = IBNAL_IB_MTU;
2806                 path->rate          = IBNAL_STATIC_RATE;
2807                 path->pkt_life_time = IBNAL_PKT_LIFETIME;
2808                 path->pkey          = IBNAL_PKEY;
2809                 path->traffic_class = IBNAL_TRAFFIC_CLASS;
2810         } else {
2811                 CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: no PATH or LID\n", 
2812                        peer->ibp_nid, HIPQUAD(peer->ibp_ip));
2813                 kibnal_connreq_done(conn, 1, -ENETUNREACH);
2814                 return;
2815         }
2816
2817         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2818         if (rc != 0) {
2819                 kibnal_connreq_done(conn, 1, rc);
2820         }
2821
2822         /* do the actual connection request */
2823         kibnal_connect_conn(conn);
2824 }
2825
2826 void
2827 kibnal_arp_callback (ibat_stat_t arprc, ibat_arp_data_t *arp_data, void *arg)
2828 {
2829         /* CAVEAT EMPTOR: tasklet context */
2830         kib_conn_t      *conn = (kib_conn_t *)arg;
2831         kib_peer_t      *peer = conn->ibc_peer;
2832
2833         if (arprc != ibat_stat_ok)
2834                 CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n",
2835                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), arprc);
2836         else
2837                 CDEBUG(D_NET, "Arp "LPX64"@%u.%u.%u.%u OK: LID %s PATH %s\n",
2838                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), 
2839                        (arp_data->mask & IBAT_LID_VALID) == 0 ? "invalid" : "valid",
2840                        (arp_data->mask & IBAT_PRI_PATH_VALID) == 0 ? "invalid" : "valid");
2841
2842         LASSERT (conn != NULL);
2843         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2844
2845         conn->ibc_connvars->cv_arprc = arprc;
2846         if (arprc == ibat_stat_ok)
2847                 conn->ibc_connvars->cv_arp = *arp_data;
2848         
2849         kibnal_schedule_conn(conn);
2850         kibnal_conn_decref(conn);
2851 }
2852
2853 void
2854 kibnal_arp_peer (kib_peer_t *peer)
2855 {
2856         cm_cep_handle_t  cep;
2857         kib_conn_t      *conn;
2858         int              ibatrc;
2859
2860         /* Only the connd does this (i.e. single threaded) */
2861         LASSERT (current == kibnal_data.kib_connd);
2862         LASSERT (peer->ibp_connecting != 0);
2863         LASSERT (peer->ibp_arp_count > 0);
2864
2865         cep = cm_create_cep(cm_cep_transp_rc);
2866         if (cep == NULL) {
2867                 CERROR ("Can't create cep for conn->"LPX64"\n",
2868                         peer->ibp_nid);
2869                 kibnal_peer_connect_failed(peer, 1);
2870                 return;
2871         }
2872
2873         conn = kibnal_create_conn(cep);
2874         if (conn == NULL) {
2875                 CERROR ("Can't allocate conn->"LPX64"\n",
2876                         peer->ibp_nid);
2877                 cm_destroy_cep(cep);
2878                 kibnal_peer_connect_failed(peer, 1);
2879                 return;
2880         }
2881
2882         conn->ibc_peer = peer;
2883         kibnal_peer_addref(peer);
2884
2885         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2886
2887         ibatrc = ibat_get_ib_data(htonl(peer->ibp_ip), INADDR_ANY, 
2888                                   ibat_paths_primary,
2889                                   &conn->ibc_connvars->cv_arp, 
2890                                   kibnal_arp_callback, conn, 0);
2891         CDEBUG(D_NET,"ibatrc %d\n", ibatrc);
2892         switch (ibatrc) {
2893         default:
2894                 LBUG();
2895                 
2896         case ibat_stat_pending:
2897                 /* NB callback has my ref on conn */
2898                 break;
2899                 
2900         case ibat_stat_ok:
2901         case ibat_stat_error:
2902         case ibat_stat_timeout:
2903         case ibat_stat_not_found:
2904                 /* Immediate return (ARP cache hit or failure) == no callback. 
2905                  * Do the next stage directly... */
2906                 conn->ibc_connvars->cv_arprc = ibatrc;
2907                 kibnal_arp_done(conn);
2908                 kibnal_conn_decref(conn);
2909                 break;
2910         }
2911 }
2912
2913 int
2914 kibnal_conn_timed_out (kib_conn_t *conn)
2915 {
2916         kib_tx_t          *tx;
2917         struct list_head  *ttmp;
2918
2919         spin_lock(&conn->ibc_lock);
2920
2921         list_for_each (ttmp, &conn->ibc_tx_queue) {
2922                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2923
2924                 LASSERT (tx->tx_queued);
2925
2926                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2927                         spin_unlock(&conn->ibc_lock);
2928                         return 1;
2929                 }
2930         }
2931
2932         list_for_each (ttmp, &conn->ibc_active_txs) {
2933                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2934
2935                 LASSERT (!tx->tx_queued);
2936                 LASSERT (tx->tx_waiting ||
2937                          tx->tx_sending != 0);
2938
2939                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2940                         spin_unlock(&conn->ibc_lock);
2941                         return 1;
2942                 }
2943         }
2944
2945         spin_unlock(&conn->ibc_lock);
2946         return 0;
2947 }
2948
2949 void
2950 kibnal_check_conns (int idx)
2951 {
2952         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2953         struct list_head  *ptmp;
2954         kib_peer_t        *peer;
2955         kib_conn_t        *conn;
2956         struct list_head  *ctmp;
2957         unsigned long      flags;
2958
2959  again:
2960         /* NB. We expect to have a look at all the peers and not find any
2961          * rdmas to time out, so we just use a shared lock while we
2962          * take a look... */
2963         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2964
2965         list_for_each (ptmp, peers) {
2966                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2967
2968                 list_for_each (ctmp, &peer->ibp_conns) {
2969                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2970
2971                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
2972
2973                         /* In case we have enough credits to return via a
2974                          * NOOP, but there were no non-blocking tx descs
2975                          * free to do it last time... */
2976                         kibnal_check_sends(conn);
2977
2978                         if (!kibnal_conn_timed_out(conn))
2979                                 continue;
2980
2981                         /* Handle timeout by closing the whole connection.  We
2982                          * can only be sure RDMA activity has ceased once the
2983                          * QP has been modified. */
2984                         
2985                         kibnal_conn_addref(conn); /* 1 ref for me... */
2986
2987                         read_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2988                                                flags);
2989
2990                         CERROR("Timed out RDMA with "LPX64"\n",
2991                                peer->ibp_nid);
2992
2993                         kibnal_close_conn (conn, -ETIMEDOUT);
2994                         kibnal_conn_decref(conn); /* ...until here */
2995
2996                         /* start again now I've dropped the lock */
2997                         goto again;
2998                 }
2999         }
3000
3001         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
3002 }
3003
3004 void
3005 kibnal_disconnect_conn (kib_conn_t *conn)
3006 {
3007         static cm_drequest_data_t dreq;         /* just for the space */
3008         
3009         cm_return_t    cmrc;
3010         unsigned long  flags;
3011
3012         LASSERT (!in_interrupt());
3013         LASSERT (current == kibnal_data.kib_connd);
3014         
3015         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
3016
3017         if (conn->ibc_disconnect) {
3018                 /* Had the CM callback already */
3019                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
3020                                         flags);
3021                 kibnal_conn_disconnected(conn);
3022                 return;
3023         }
3024                 
3025         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
3026
3027         /* active disconnect */
3028         cmrc = cm_disconnect(conn->ibc_cep, &dreq, NULL);
3029         if (cmrc == cm_stat_success) {
3030                 /* waiting for CM */
3031                 conn->ibc_state = IBNAL_CONN_DISCONNECT2;
3032                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
3033                 return;
3034         }
3035
3036         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
3037
3038         cm_cancel(conn->ibc_cep);
3039         kibnal_pause(HZ/10);
3040
3041         if (!conn->ibc_disconnect)              /* CM callback will never happen now */
3042                 kibnal_conn_decref(conn);
3043         
3044         LASSERT (atomic_read(&conn->ibc_refcount) > 0);
3045         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
3046
3047         kibnal_conn_disconnected(conn);
3048 }
3049
3050 int
3051 kibnal_connd (void *arg)
3052 {
3053         wait_queue_t       wait;
3054         unsigned long      flags;
3055         kib_pcreq_t       *pcr;
3056         kib_conn_t        *conn;
3057         kib_peer_t        *peer;
3058         int                timeout;
3059         int                i;
3060         int                dropped_lock;
3061         int                peer_index = 0;
3062         unsigned long      deadline = jiffies;
3063         
3064         kportal_daemonize ("kibnal_connd");
3065         kportal_blockallsigs ();
3066
3067         init_waitqueue_entry (&wait, current);
3068         kibnal_data.kib_connd = current;
3069
3070         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3071
3072         while (!kibnal_data.kib_shutdown) {
3073
3074                 dropped_lock = 0;
3075
3076                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
3077                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
3078                                            kib_conn_t, ibc_list);
3079                         list_del (&conn->ibc_list);
3080                         
3081                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3082                         dropped_lock = 1;
3083
3084                         kibnal_destroy_conn(conn);
3085
3086                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3087                 }
3088
3089                 if (!list_empty (&kibnal_data.kib_connd_pcreqs)) {
3090                         pcr = list_entry(kibnal_data.kib_connd_pcreqs.next,
3091                                          kib_pcreq_t, pcr_list);
3092                         list_del(&pcr->pcr_list);
3093                         
3094                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3095                         dropped_lock = 1;
3096
3097                         kibnal_recv_connreq(pcr->pcr_cep, &pcr->pcr_cmreq);
3098                         PORTAL_FREE(pcr, sizeof(*pcr));
3099
3100                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3101                 }
3102                         
3103                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
3104                         peer = list_entry (kibnal_data.kib_connd_peers.next,
3105                                            kib_peer_t, ibp_connd_list);
3106                         
3107                         list_del_init (&peer->ibp_connd_list);
3108                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3109                         dropped_lock = 1;
3110
3111                         kibnal_arp_peer (peer);
3112                         kibnal_peer_decref (peer);
3113
3114                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3115                 }
3116
3117                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3118                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3119                                            kib_conn_t, ibc_list);
3120                         list_del (&conn->ibc_list);
3121                         
3122                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3123                         dropped_lock = 1;
3124
3125                         switch (conn->ibc_state) {
3126                         default:
3127                                 LBUG();
3128                                 
3129                         case IBNAL_CONN_ACTIVE_ARP:
3130                                 kibnal_arp_done(conn);
3131                                 break;
3132
3133                         case IBNAL_CONN_ACTIVE_CONNECT:
3134                                 kibnal_check_connreply(conn);
3135                                 break;
3136
3137                         case IBNAL_CONN_PASSIVE_WAIT:
3138                                 kibnal_check_passive_wait(conn);
3139                                 break;
3140
3141                         case IBNAL_CONN_DISCONNECT1:
3142                         case IBNAL_CONN_DISCONNECT2:
3143                                 kibnal_disconnect_conn(conn);
3144                                 break;
3145                         }
3146                         kibnal_conn_decref(conn);
3147
3148                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3149                 }
3150
3151                 /* careful with the jiffy wrap... */
3152                 timeout = (int)(deadline - jiffies);
3153                 if (timeout <= 0) {
3154                         const int n = 4;
3155                         const int p = 1;
3156                         int       chunk = kibnal_data.kib_peer_hash_size;
3157                         
3158                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3159                         dropped_lock = 1;
3160
3161                         /* Time to check for RDMA timeouts on a few more
3162                          * peers: I do checks every 'p' seconds on a
3163                          * proportion of the peer table and I need to check
3164                          * every connection 'n' times within a timeout
3165                          * interval, to ensure I detect a timeout on any
3166                          * connection within (n+1)/n times the timeout
3167                          * interval. */
3168
3169                         if (kibnal_tunables.kib_io_timeout > n * p)
3170                                 chunk = (chunk * n * p) / 
3171                                         kibnal_tunables.kib_io_timeout;
3172                         if (chunk == 0)
3173                                 chunk = 1;
3174
3175                         for (i = 0; i < chunk; i++) {
3176                                 kibnal_check_conns (peer_index);
3177                                 peer_index = (peer_index + 1) % 
3178                                              kibnal_data.kib_peer_hash_size;
3179                         }
3180
3181                         deadline += p * HZ;
3182                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3183                 }
3184
3185                 if (dropped_lock)
3186                         continue;
3187                 
3188                 /* Nothing to do for 'timeout'  */
3189                 set_current_state (TASK_INTERRUPTIBLE);
3190                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3191                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3192
3193                 schedule_timeout (timeout);
3194
3195                 set_current_state (TASK_RUNNING);
3196                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3197                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3198         }
3199
3200         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3201
3202         kibnal_thread_fini ();
3203         return (0);
3204 }
3205
3206 void 
3207 kibnal_async_callback(vv_event_record_t ev)
3208 {
3209         CERROR("type: %d, port: %d, data: "LPX64"\n", 
3210                ev.event_type, ev.port_num, ev.type.data);
3211 }
3212
3213 void
3214 kibnal_cq_callback (unsigned long unused_context)
3215 {
3216         unsigned long    flags;
3217
3218         CDEBUG(D_NET, "!!\n");
3219
3220         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3221         kibnal_data.kib_ready = 1;
3222         wake_up(&kibnal_data.kib_sched_waitq);
3223         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3224 }
3225
3226 int
3227 kibnal_scheduler(void *arg)
3228 {
3229         long            id = (long)arg;
3230         wait_queue_t    wait;
3231         char            name[16];
3232         vv_wc_t         wc;
3233         vv_return_t     vvrc;
3234         vv_return_t     vvrc2;
3235         unsigned long   flags;
3236         kib_rx_t       *rx;
3237         __u64           rxseq = 0;
3238         int             busy_loops = 0;
3239
3240         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3241         kportal_daemonize(name);
3242         kportal_blockallsigs();
3243
3244         init_waitqueue_entry(&wait, current);
3245
3246         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3247
3248         while (!kibnal_data.kib_shutdown) {
3249                 if (busy_loops++ >= IBNAL_RESCHED) {
3250                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3251                                                flags);
3252
3253                         our_cond_resched();
3254                         busy_loops = 0;
3255                         
3256                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3257                 }
3258
3259                 if (kibnal_data.kib_ready &&
3260                     !kibnal_data.kib_checking_cq) {
3261                         /* take ownership of completion polling */
3262                         kibnal_data.kib_checking_cq = 1;
3263                         /* Assume I'll exhaust the CQ */
3264                         kibnal_data.kib_ready = 0;
3265                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, 
3266                                                flags);
3267                         
3268                         vvrc = vv_poll_for_completion(kibnal_data.kib_hca, 
3269                                                       kibnal_data.kib_cq, &wc);
3270                         if (vvrc == vv_return_err_cq_empty) {
3271                                 vvrc2 = vv_request_completion_notification(
3272                                         kibnal_data.kib_hca, 
3273                                         kibnal_data.kib_cq, 
3274                                         vv_next_solicit_unsolicit_event);
3275                                 LASSERT (vvrc2 == vv_return_ok);
3276                         }
3277
3278                         if (vvrc == vv_return_ok &&
3279                             kibnal_wreqid2type(wc.wr_id) == IBNAL_WID_RX) {
3280                                 rx = (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id);
3281
3282                                 /* Grab the RX sequence number NOW before
3283                                  * anyone else can get an RX completion */
3284                                 rxseq = rx->rx_conn->ibc_rxseq++;
3285                         }
3286
3287                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3288                         /* give up ownership of completion polling */
3289                         kibnal_data.kib_checking_cq = 0;
3290
3291                         if (vvrc == vv_return_err_cq_empty)
3292                                 continue;
3293
3294                         LASSERT (vvrc == vv_return_ok);
3295                         /* Assume there's more: get another scheduler to check
3296                          * while I handle this completion... */
3297
3298                         kibnal_data.kib_ready = 1;
3299                         wake_up(&kibnal_data.kib_sched_waitq);
3300
3301                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3302                                                flags);
3303
3304                         switch (kibnal_wreqid2type(wc.wr_id)) {
3305                         case IBNAL_WID_RX:
3306                                 kibnal_rx_complete(
3307                                         (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id),
3308                                         wc.completion_status,
3309                                         wc.num_bytes_transfered,
3310                                         rxseq);
3311                                 break;
3312
3313                         case IBNAL_WID_TX:
3314                                 kibnal_tx_complete(
3315                                         (kib_tx_t *)kibnal_wreqid2ptr(wc.wr_id),
3316                                         wc.completion_status);
3317                                 break;
3318
3319                         case IBNAL_WID_RDMA:
3320                                 /* We only get RDMA completion notification if
3321                                  * it fails.  So we just ignore them completely
3322                                  * because...
3323                                  *
3324                                  * 1) If an RDMA fails, all subsequent work
3325                                  * items, including the final SEND will fail
3326                                  * too, so I'm still guaranteed to notice that
3327                                  * this connection is hosed.
3328                                  *
3329                                  * 2) It's positively dangerous to look inside
3330                                  * the tx descriptor obtained from an RDMA work
3331                                  * item.  As soon as I drop the kib_sched_lock,
3332                                  * I give a scheduler on another CPU a chance
3333                                  * to get the final SEND completion, so the tx
3334                                  * descriptor can get freed as I inspect it. */
3335                                 CERROR ("RDMA failed: %d\n", 
3336                                         wc.completion_status);
3337                                 break;
3338
3339                         default:
3340                                 LBUG();
3341                         }
3342                         
3343                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3344                         continue;
3345                 }
3346
3347                 /* Nothing to do; sleep... */
3348
3349                 set_current_state(TASK_INTERRUPTIBLE);
3350                 add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3351                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3352                                        flags);
3353
3354                 schedule();
3355
3356                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3357                 set_current_state(TASK_RUNNING);
3358                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3359         }
3360
3361         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3362
3363         kibnal_thread_fini();
3364         return (0);
3365 }
3366
3367
3368 lib_nal_t kibnal_lib = {
3369         .libnal_data = &kibnal_data,      /* NAL private data */
3370         .libnal_send = kibnal_send,
3371         .libnal_send_pages = kibnal_send_pages,
3372         .libnal_recv = kibnal_recv,
3373         .libnal_recv_pages = kibnal_recv_pages,
3374         .libnal_dist = kibnal_dist
3375 };