Whamcloud - gitweb
LU-7646 lnet: Stop Infinite CON RACE Condition
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "o2iblnd.h"
42
43 #define MAX_CONN_RACES_BEFORE_ABORT 20
44
45 static void kiblnd_peer_alive(kib_peer_t *peer);
46 static void kiblnd_peer_connect_failed(kib_peer_t *peer, int active, int error);
47 static void kiblnd_init_tx_msg(lnet_ni_t *ni, kib_tx_t *tx,
48                                int type, int body_nob);
49 static int kiblnd_init_rdma(kib_conn_t *conn, kib_tx_t *tx, int type,
50                             int resid, kib_rdma_desc_t *dstrd, __u64 dstcookie);
51 static void kiblnd_queue_tx_locked(kib_tx_t *tx, kib_conn_t *conn);
52 static void kiblnd_queue_tx(kib_tx_t *tx, kib_conn_t *conn);
53 static void kiblnd_unmap_tx(lnet_ni_t *ni, kib_tx_t *tx);
54 static void kiblnd_check_sends_locked(kib_conn_t *conn);
55
56 void
57 kiblnd_tx_done (lnet_ni_t *ni, kib_tx_t *tx)
58 {
59         lnet_msg_t *lntmsg[2];
60         kib_net_t  *net = ni->ni_data;
61         int         rc;
62         int         i;
63
64         LASSERT (net != NULL);
65         LASSERT (!in_interrupt());
66         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
67         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
68         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
69         LASSERT (tx->tx_pool != NULL);
70
71         kiblnd_unmap_tx(ni, tx);
72
73         /* tx may have up to 2 lnet msgs to finalise */
74         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
75         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
76         rc = tx->tx_status;
77
78         if (tx->tx_conn != NULL) {
79                 LASSERT (ni == tx->tx_conn->ibc_peer->ibp_ni);
80
81                 kiblnd_conn_decref(tx->tx_conn);
82                 tx->tx_conn = NULL;
83         }
84
85         tx->tx_nwrq = 0;
86         tx->tx_status = 0;
87
88         kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
89
90         /* delay finalize until my descs have been freed */
91         for (i = 0; i < 2; i++) {
92                 if (lntmsg[i] == NULL)
93                         continue;
94
95                 lnet_finalize(ni, lntmsg[i], rc);
96         }
97 }
98
99 void
100 kiblnd_txlist_done(lnet_ni_t *ni, struct list_head *txlist, int status)
101 {
102         kib_tx_t *tx;
103
104         while (!list_empty(txlist)) {
105                 tx = list_entry(txlist->next, kib_tx_t, tx_list);
106
107                 list_del(&tx->tx_list);
108                 /* complete now */
109                 tx->tx_waiting = 0;
110                 tx->tx_status = status;
111                 kiblnd_tx_done(ni, tx);
112         }
113 }
114
115 static kib_tx_t *
116 kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
117 {
118         kib_net_t               *net = (kib_net_t *)ni->ni_data;
119         struct list_head        *node;
120         kib_tx_t                *tx;
121         kib_tx_poolset_t        *tps;
122
123         tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
124         node = kiblnd_pool_alloc_node(&tps->tps_poolset);
125         if (node == NULL)
126                 return NULL;
127         tx = container_of(node, kib_tx_t, tx_list);
128
129         LASSERT (tx->tx_nwrq == 0);
130         LASSERT (!tx->tx_queued);
131         LASSERT (tx->tx_sending == 0);
132         LASSERT (!tx->tx_waiting);
133         LASSERT (tx->tx_status == 0);
134         LASSERT (tx->tx_conn == NULL);
135         LASSERT (tx->tx_lntmsg[0] == NULL);
136         LASSERT (tx->tx_lntmsg[1] == NULL);
137         LASSERT (tx->tx_nfrags == 0);
138
139         return tx;
140 }
141
142 static void
143 kiblnd_drop_rx(kib_rx_t *rx)
144 {
145         kib_conn_t              *conn   = rx->rx_conn;
146         struct kib_sched_info   *sched  = conn->ibc_sched;
147         unsigned long           flags;
148
149         spin_lock_irqsave(&sched->ibs_lock, flags);
150         LASSERT(conn->ibc_nrx > 0);
151         conn->ibc_nrx--;
152         spin_unlock_irqrestore(&sched->ibs_lock, flags);
153
154         kiblnd_conn_decref(conn);
155 }
156
157 int
158 kiblnd_post_rx (kib_rx_t *rx, int credit)
159 {
160         kib_conn_t         *conn = rx->rx_conn;
161         kib_net_t          *net = conn->ibc_peer->ibp_ni->ni_data;
162         struct ib_recv_wr  *bad_wrq = NULL;
163         struct ib_mr       *mr = conn->ibc_hdev->ibh_mrs;
164         int                 rc;
165
166         LASSERT (net != NULL);
167         LASSERT (!in_interrupt());
168         LASSERT (credit == IBLND_POSTRX_NO_CREDIT ||
169                  credit == IBLND_POSTRX_PEER_CREDIT ||
170                  credit == IBLND_POSTRX_RSRVD_CREDIT);
171         LASSERT(mr != NULL);
172
173         rx->rx_sge.lkey   = mr->lkey;
174         rx->rx_sge.addr   = rx->rx_msgaddr;
175         rx->rx_sge.length = IBLND_MSG_SIZE;
176
177         rx->rx_wrq.next = NULL;
178         rx->rx_wrq.sg_list = &rx->rx_sge;
179         rx->rx_wrq.num_sge = 1;
180         rx->rx_wrq.wr_id = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
181
182         LASSERT (conn->ibc_state >= IBLND_CONN_INIT);
183         LASSERT (rx->rx_nob >= 0);              /* not posted */
184
185         if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
186                 kiblnd_drop_rx(rx);             /* No more posts for this rx */
187                 return 0;
188         }
189
190         rx->rx_nob = -1;                        /* flag posted */
191
192         /* NB: need an extra reference after ib_post_recv because we don't
193          * own this rx (and rx::rx_conn) anymore, LU-5678.
194          */
195         kiblnd_conn_addref(conn);
196         rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
197         if (unlikely(rc != 0)) {
198                 CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
199                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
200                 rx->rx_nob = 0;
201         }
202
203         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
204                 goto out;
205
206         if (unlikely(rc != 0)) {
207                 kiblnd_close_conn(conn, rc);
208                 kiblnd_drop_rx(rx);     /* No more posts for this rx */
209                 goto out;
210         }
211
212         if (credit == IBLND_POSTRX_NO_CREDIT)
213                 goto out;
214
215         spin_lock(&conn->ibc_lock);
216         if (credit == IBLND_POSTRX_PEER_CREDIT)
217                 conn->ibc_outstanding_credits++;
218         else
219                 conn->ibc_reserved_credits++;
220         kiblnd_check_sends_locked(conn);
221         spin_unlock(&conn->ibc_lock);
222
223 out:
224         kiblnd_conn_decref(conn);
225         return rc;
226 }
227
228 static kib_tx_t *
229 kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
230 {
231         struct list_head *tmp;
232
233         list_for_each(tmp, &conn->ibc_active_txs) {
234                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
235
236                 LASSERT(!tx->tx_queued);
237                 LASSERT(tx->tx_sending != 0 || tx->tx_waiting);
238
239                 if (tx->tx_cookie != cookie)
240                         continue;
241
242                 if (tx->tx_waiting &&
243                     tx->tx_msg->ibm_type == txtype)
244                         return tx;
245
246                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
247                       tx->tx_waiting ? "" : "NOT ",
248                       tx->tx_msg->ibm_type, txtype);
249         }
250         return NULL;
251 }
252
253 static void
254 kiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
255 {
256         kib_tx_t    *tx;
257         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
258         int          idle;
259
260         spin_lock(&conn->ibc_lock);
261
262         tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
263         if (tx == NULL) {
264                 spin_unlock(&conn->ibc_lock);
265
266                 CWARN("Unmatched completion type %x cookie %#llx from %s\n",
267                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
268                 kiblnd_close_conn(conn, -EPROTO);
269                 return;
270         }
271
272         if (tx->tx_status == 0) {               /* success so far */
273                 if (status < 0) {               /* failed? */
274                         tx->tx_status = status;
275                 } else if (txtype == IBLND_MSG_GET_REQ) {
276                         lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
277                 }
278         }
279
280         tx->tx_waiting = 0;
281
282         idle = !tx->tx_queued && (tx->tx_sending == 0);
283         if (idle)
284                 list_del(&tx->tx_list);
285
286         spin_unlock(&conn->ibc_lock);
287
288         if (idle)
289                 kiblnd_tx_done(ni, tx);
290 }
291
292 static void
293 kiblnd_send_completion(kib_conn_t *conn, int type, int status, __u64 cookie)
294 {
295         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
296         kib_tx_t    *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
297
298         if (tx == NULL) {
299                 CERROR("Can't get tx for completion %x for %s\n",
300                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
301                 return;
302         }
303
304         tx->tx_msg->ibm_u.completion.ibcm_status = status;
305         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
306         kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));
307
308         kiblnd_queue_tx(tx, conn);
309 }
310
311 static void
312 kiblnd_handle_rx (kib_rx_t *rx)
313 {
314         kib_msg_t    *msg = rx->rx_msg;
315         kib_conn_t   *conn = rx->rx_conn;
316         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
317         int           credits = msg->ibm_credits;
318         kib_tx_t     *tx;
319         int           rc = 0;
320         int           rc2;
321         int           post_credit;
322
323         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
324
325         CDEBUG (D_NET, "Received %x[%d] from %s\n",
326                 msg->ibm_type, credits,
327                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
328
329         if (credits != 0) {
330                 /* Have I received credits that will let me send? */
331                 spin_lock(&conn->ibc_lock);
332
333                 if (conn->ibc_credits + credits >
334                     conn->ibc_queue_depth) {
335                         rc2 = conn->ibc_credits;
336                         spin_unlock(&conn->ibc_lock);
337
338                         CERROR("Bad credits from %s: %d + %d > %d\n",
339                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
340                                rc2, credits,
341                                conn->ibc_queue_depth);
342
343                         kiblnd_close_conn(conn, -EPROTO);
344                         kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
345                         return;
346                 }
347
348                 conn->ibc_credits += credits;
349
350                 /* This ensures the credit taken by NOOP can be returned */
351                 if (msg->ibm_type == IBLND_MSG_NOOP &&
352                     !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
353                         conn->ibc_outstanding_credits++;
354
355                 kiblnd_check_sends_locked(conn);
356                 spin_unlock(&conn->ibc_lock);
357         }
358
359         switch (msg->ibm_type) {
360         default:
361                 CERROR("Bad IBLND message type %x from %s\n",
362                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
363                 post_credit = IBLND_POSTRX_NO_CREDIT;
364                 rc = -EPROTO;
365                 break;
366
367         case IBLND_MSG_NOOP:
368                 if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
369                         post_credit = IBLND_POSTRX_NO_CREDIT;
370                         break;
371                 }
372
373                 if (credits != 0) /* credit already posted */
374                         post_credit = IBLND_POSTRX_NO_CREDIT;
375                 else              /* a keepalive NOOP */
376                         post_credit = IBLND_POSTRX_PEER_CREDIT;
377                 break;
378
379         case IBLND_MSG_IMMEDIATE:
380                 post_credit = IBLND_POSTRX_DONT_POST;
381                 rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
382                                 msg->ibm_srcnid, rx, 0);
383                 if (rc < 0)                     /* repost on error */
384                         post_credit = IBLND_POSTRX_PEER_CREDIT;
385                 break;
386
387         case IBLND_MSG_PUT_REQ:
388                 post_credit = IBLND_POSTRX_DONT_POST;
389                 rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
390                                 msg->ibm_srcnid, rx, 1);
391                 if (rc < 0)                     /* repost on error */
392                         post_credit = IBLND_POSTRX_PEER_CREDIT;
393                 break;
394
395         case IBLND_MSG_PUT_NAK:
396                 CWARN ("PUT_NACK from %s\n",
397                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
398                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
399                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
400                                          msg->ibm_u.completion.ibcm_status,
401                                          msg->ibm_u.completion.ibcm_cookie);
402                 break;
403
404         case IBLND_MSG_PUT_ACK:
405                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
406
407                 spin_lock(&conn->ibc_lock);
408                 tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
409                                         msg->ibm_u.putack.ibpam_src_cookie);
410                 if (tx != NULL)
411                         list_del(&tx->tx_list);
412                 spin_unlock(&conn->ibc_lock);
413
414                 if (tx == NULL) {
415                         CERROR("Unmatched PUT_ACK from %s\n",
416                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
417                         rc = -EPROTO;
418                         break;
419                 }
420
421                 LASSERT (tx->tx_waiting);
422                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
423                  * (a) I can overwrite tx_msg since my peer has received it!
424                  * (b) tx_waiting set tells tx_complete() it's not done. */
425
426                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
427
428                 rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
429                                        kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
430                                        &msg->ibm_u.putack.ibpam_rd,
431                                        msg->ibm_u.putack.ibpam_dst_cookie);
432                 if (rc2 < 0)
433                         CERROR("Can't setup rdma for PUT to %s: %d\n",
434                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
435
436                 spin_lock(&conn->ibc_lock);
437                 tx->tx_waiting = 0;     /* clear waiting and queue atomically */
438                 kiblnd_queue_tx_locked(tx, conn);
439                 spin_unlock(&conn->ibc_lock);
440                 break;
441
442         case IBLND_MSG_PUT_DONE:
443                 post_credit = IBLND_POSTRX_PEER_CREDIT;
444                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
445                                          msg->ibm_u.completion.ibcm_status,
446                                          msg->ibm_u.completion.ibcm_cookie);
447                 break;
448
449         case IBLND_MSG_GET_REQ:
450                 post_credit = IBLND_POSTRX_DONT_POST;
451                 rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
452                                 msg->ibm_srcnid, rx, 1);
453                 if (rc < 0)                     /* repost on error */
454                         post_credit = IBLND_POSTRX_PEER_CREDIT;
455                 break;
456
457         case IBLND_MSG_GET_DONE:
458                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
459                 kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
460                                          msg->ibm_u.completion.ibcm_status,
461                                          msg->ibm_u.completion.ibcm_cookie);
462                 break;
463         }
464
465         if (rc < 0)                             /* protocol error */
466                 kiblnd_close_conn(conn, rc);
467
468         if (post_credit != IBLND_POSTRX_DONT_POST)
469                 kiblnd_post_rx(rx, post_credit);
470 }
471
472 static void
473 kiblnd_rx_complete (kib_rx_t *rx, int status, int nob)
474 {
475         kib_msg_t    *msg = rx->rx_msg;
476         kib_conn_t   *conn = rx->rx_conn;
477         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
478         kib_net_t    *net = ni->ni_data;
479         int           rc;
480         int           err = -EIO;
481
482         LASSERT (net != NULL);
483         LASSERT (rx->rx_nob < 0);               /* was posted */
484         rx->rx_nob = 0;                         /* isn't now */
485
486         if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
487                 goto ignore;
488
489         if (status != IB_WC_SUCCESS) {
490                 CNETERR("Rx from %s failed: %d\n",
491                         libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
492                 goto failed;
493         }
494
495         LASSERT (nob >= 0);
496         rx->rx_nob = nob;
497
498         rc = kiblnd_unpack_msg(msg, rx->rx_nob);
499         if (rc != 0) {
500                 CERROR ("Error %d unpacking rx from %s\n",
501                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
502                 goto failed;
503         }
504
505         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
506             msg->ibm_dstnid != ni->ni_nid ||
507             msg->ibm_srcstamp != conn->ibc_incarnation ||
508             msg->ibm_dststamp != net->ibn_incarnation) {
509                 CERROR ("Stale rx from %s\n",
510                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
511                 err = -ESTALE;
512                 goto failed;
513         }
514
515         /* set time last known alive */
516         kiblnd_peer_alive(conn->ibc_peer);
517
518         /* racing with connection establishment/teardown! */
519
520         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
521                 rwlock_t  *g_lock = &kiblnd_data.kib_global_lock;
522                 unsigned long  flags;
523
524                 write_lock_irqsave(g_lock, flags);
525                 /* must check holding global lock to eliminate race */
526                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
527                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
528                         write_unlock_irqrestore(g_lock, flags);
529                         return;
530                 }
531                 write_unlock_irqrestore(g_lock, flags);
532         }
533         kiblnd_handle_rx(rx);
534         return;
535
536  failed:
537         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
538         kiblnd_close_conn(conn, err);
539  ignore:
540         kiblnd_drop_rx(rx);                     /* Don't re-post rx. */
541 }
542
543 static struct page *
544 kiblnd_kvaddr_to_page (unsigned long vaddr)
545 {
546         struct page *page;
547
548         if (is_vmalloc_addr((void *)vaddr)) {
549                 page = vmalloc_to_page ((void *)vaddr);
550                 LASSERT (page != NULL);
551                 return page;
552         }
553 #ifdef CONFIG_HIGHMEM
554         if (vaddr >= PKMAP_BASE &&
555             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
556                 /* No highmem pages only used for bulk (kiov) I/O */
557                 CERROR("find page for address in highmem\n");
558                 LBUG();
559         }
560 #endif
561         page = virt_to_page (vaddr);
562         LASSERT (page != NULL);
563         return page;
564 }
565
566 static int
567 kiblnd_fmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, __u32 nob)
568 {
569         kib_hca_dev_t           *hdev;
570         kib_fmr_poolset_t       *fps;
571         int                     cpt;
572         int                     rc;
573
574         LASSERT(tx->tx_pool != NULL);
575         LASSERT(tx->tx_pool->tpo_pool.po_owner != NULL);
576
577         hdev = tx->tx_pool->tpo_hdev;
578         cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
579
580         fps = net->ibn_fmr_ps[cpt];
581         rc = kiblnd_fmr_pool_map(fps, tx, rd, nob, 0, &tx->fmr);
582         if (rc != 0) {
583                 CERROR("Can't map %u pages: %d\n", nob, rc);
584                 return rc;
585         }
586
587         /* If rd is not tx_rd, it's going to get sent to a peer, who will need
588          * the rkey */
589         rd->rd_key = tx->fmr.fmr_key;
590         rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
591         rd->rd_frags[0].rf_nob   = nob;
592         rd->rd_nfrags = 1;
593
594         return 0;
595 }
596
597 static void
598 kiblnd_unmap_tx(lnet_ni_t *ni, kib_tx_t *tx)
599 {
600         kib_net_t  *net = ni->ni_data;
601
602         LASSERT(net != NULL);
603
604         if (net->ibn_fmr_ps != NULL)
605                 kiblnd_fmr_pool_unmap(&tx->fmr, tx->tx_status);
606
607         if (tx->tx_nfrags != 0) {
608                 kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
609                                     tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
610                 tx->tx_nfrags = 0;
611         }
612 }
613
614 static int
615 kiblnd_map_tx(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd, int nfrags)
616 {
617         kib_net_t     *net   = ni->ni_data;
618         kib_hca_dev_t *hdev  = net->ibn_dev->ibd_hdev;
619         struct ib_mr  *mr    = NULL;
620         __u32 nob;
621         int i;
622
623         /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
624          * RDMA sink */
625         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
626         tx->tx_nfrags = nfrags;
627
628         rd->rd_nfrags = kiblnd_dma_map_sg(hdev->ibh_ibdev, tx->tx_frags,
629                                           tx->tx_nfrags, tx->tx_dmadir);
630
631         for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
632                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
633                         hdev->ibh_ibdev, &tx->tx_frags[i]);
634                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
635                         hdev->ibh_ibdev, &tx->tx_frags[i]);
636                 nob += rd->rd_frags[i].rf_nob;
637         }
638
639         mr = kiblnd_find_rd_dma_mr(ni, rd,
640                                    (tx->tx_conn != NULL) ?
641                                    tx->tx_conn->ibc_max_frags : -1);
642         if (mr != NULL) {
643                 /* found pre-mapping MR */
644                 rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
645                 return 0;
646         }
647
648         if (net->ibn_fmr_ps != NULL)
649                 return kiblnd_fmr_map_tx(net, tx, rd, nob);
650
651         return -EINVAL;
652 }
653
654
655 static int
656 kiblnd_setup_rd_iov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
657                     unsigned int niov, struct kvec *iov, int offset, int nob)
658 {
659         kib_net_t          *net = ni->ni_data;
660         struct page        *page;
661         struct scatterlist *sg;
662         unsigned long       vaddr;
663         int                 fragnob;
664         int                 page_offset;
665
666         LASSERT (nob > 0);
667         LASSERT (niov > 0);
668         LASSERT (net != NULL);
669
670         while (offset >= iov->iov_len) {
671                 offset -= iov->iov_len;
672                 niov--;
673                 iov++;
674                 LASSERT (niov > 0);
675         }
676
677         sg = tx->tx_frags;
678         do {
679                 LASSERT (niov > 0);
680
681                 vaddr = ((unsigned long)iov->iov_base) + offset;
682                 page_offset = vaddr & (PAGE_SIZE - 1);
683                 page = kiblnd_kvaddr_to_page(vaddr);
684                 if (page == NULL) {
685                         CERROR ("Can't find page\n");
686                         return -EFAULT;
687                 }
688
689                 fragnob = min((int)(iov->iov_len - offset), nob);
690                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
691
692                 sg_set_page(sg, page, fragnob, page_offset);
693                 sg = sg_next(sg);
694                 if (!sg) {
695                         CERROR("lacking enough sg entries to map tx\n");
696                         return -EFAULT;
697                 }
698
699                 if (offset + fragnob < iov->iov_len) {
700                         offset += fragnob;
701                 } else {
702                         offset = 0;
703                         iov++;
704                         niov--;
705                 }
706                 nob -= fragnob;
707         } while (nob > 0);
708
709         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
710 }
711
712 static int
713 kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
714                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
715 {
716         kib_net_t          *net = ni->ni_data;
717         struct scatterlist *sg;
718         int                 fragnob;
719
720         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
721
722         LASSERT (nob > 0);
723         LASSERT (nkiov > 0);
724         LASSERT (net != NULL);
725
726         while (offset >= kiov->kiov_len) {
727                 offset -= kiov->kiov_len;
728                 nkiov--;
729                 kiov++;
730                 LASSERT (nkiov > 0);
731         }
732
733         sg = tx->tx_frags;
734         do {
735                 LASSERT (nkiov > 0);
736
737                 fragnob = min((int)(kiov->kiov_len - offset), nob);
738
739                 sg_set_page(sg, kiov->kiov_page, fragnob,
740                             kiov->kiov_offset + offset);
741                 sg = sg_next(sg);
742                 if (!sg) {
743                         CERROR("lacking enough sg entries to map tx\n");
744                         return -EFAULT;
745                 }
746
747                 offset = 0;
748                 kiov++;
749                 nkiov--;
750                 nob -= fragnob;
751         } while (nob > 0);
752
753         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
754 }
755
756 static int
757 kiblnd_post_tx_locked (kib_conn_t *conn, kib_tx_t *tx, int credit)
758 __must_hold(&conn->ibc_lock)
759 {
760         kib_msg_t         *msg = tx->tx_msg;
761         kib_peer_t        *peer = conn->ibc_peer;
762         struct lnet_ni    *ni = peer->ibp_ni;
763         int                ver = conn->ibc_version;
764         int                rc;
765         int                done;
766
767         LASSERT(tx->tx_queued);
768         /* We rely on this for QP sizing */
769         LASSERT(tx->tx_nwrq > 0);
770         LASSERT(tx->tx_nwrq <= 1 + conn->ibc_max_frags);
771
772         LASSERT(credit == 0 || credit == 1);
773         LASSERT(conn->ibc_outstanding_credits >= 0);
774         LASSERT(conn->ibc_outstanding_credits <= conn->ibc_queue_depth);
775         LASSERT(conn->ibc_credits >= 0);
776         LASSERT(conn->ibc_credits <= conn->ibc_queue_depth);
777
778         if (conn->ibc_nsends_posted ==
779             kiblnd_concurrent_sends(ver, ni)) {
780                 /* tx completions outstanding... */
781                 CDEBUG(D_NET, "%s: posted enough\n",
782                        libcfs_nid2str(peer->ibp_nid));
783                 return -EAGAIN;
784         }
785
786         if (credit != 0 && conn->ibc_credits == 0) {   /* no credits */
787                 CDEBUG(D_NET, "%s: no credits\n",
788                        libcfs_nid2str(peer->ibp_nid));
789                 return -EAGAIN;
790         }
791
792         if (credit != 0 && !IBLND_OOB_CAPABLE(ver) &&
793             conn->ibc_credits == 1 &&   /* last credit reserved */
794             msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
795                 CDEBUG(D_NET, "%s: not using last credit\n",
796                        libcfs_nid2str(peer->ibp_nid));
797                 return -EAGAIN;
798         }
799
800         /* NB don't drop ibc_lock before bumping tx_sending */
801         list_del(&tx->tx_list);
802         tx->tx_queued = 0;
803
804         if (msg->ibm_type == IBLND_MSG_NOOP &&
805             (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
806              (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
807               conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
808                 /* OK to drop when posted enough NOOPs, since
809                  * kiblnd_check_sends_locked will queue NOOP again when
810                  * posted NOOPs complete */
811                 spin_unlock(&conn->ibc_lock);
812                 kiblnd_tx_done(peer->ibp_ni, tx);
813                 spin_lock(&conn->ibc_lock);
814                 CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
815                        libcfs_nid2str(peer->ibp_nid),
816                        conn->ibc_noops_posted);
817                 return 0;
818         }
819
820         kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
821                         peer->ibp_nid, conn->ibc_incarnation);
822
823         conn->ibc_credits -= credit;
824         conn->ibc_outstanding_credits = 0;
825         conn->ibc_nsends_posted++;
826         if (msg->ibm_type == IBLND_MSG_NOOP)
827                 conn->ibc_noops_posted++;
828
829         /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
830          * PUT.  If so, it was first queued here as a PUT_REQ, sent and
831          * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
832          * and then re-queued here.  It's (just) possible that
833          * tx_sending is non-zero if we've not done the tx_complete()
834          * from the first send; hence the ++ rather than = below. */
835         tx->tx_sending++;
836         list_add(&tx->tx_list, &conn->ibc_active_txs);
837
838         /* I'm still holding ibc_lock! */
839         if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
840                 rc = -ECONNABORTED;
841         } else if (tx->tx_pool->tpo_pool.po_failed ||
842                  conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
843                 /* close_conn will launch failover */
844                 rc = -ENETDOWN;
845         } else {
846                 struct kib_fast_reg_descriptor *frd = tx->fmr.fmr_frd;
847                 struct ib_send_wr *bad = &tx->tx_wrq[tx->tx_nwrq - 1].wr;
848                 struct ib_send_wr *wr  = &tx->tx_wrq[0].wr;
849
850                 if (frd != NULL) {
851                         if (!frd->frd_valid) {
852                                 wr = &frd->frd_inv_wr.wr;
853                                 wr->next = &frd->frd_fastreg_wr.wr;
854                         } else {
855                                 wr = &frd->frd_fastreg_wr.wr;
856                         }
857                         frd->frd_fastreg_wr.wr.next = &tx->tx_wrq[0].wr;
858                 }
859
860                 LASSERTF(bad->wr_id == kiblnd_ptr2wreqid(tx, IBLND_WID_TX),
861                          "bad wr_id %#llx, opc %d, flags %d, peer: %s\n",
862                          bad->wr_id, bad->opcode, bad->send_flags,
863                          libcfs_nid2str(conn->ibc_peer->ibp_nid));
864
865                 bad = NULL;
866                 rc = ib_post_send(conn->ibc_cmid->qp, wr, &bad);
867         }
868
869         conn->ibc_last_send = jiffies;
870
871         if (rc == 0)
872                 return 0;
873
874         /* NB credits are transferred in the actual
875          * message, which can only be the last work item */
876         conn->ibc_credits += credit;
877         conn->ibc_outstanding_credits += msg->ibm_credits;
878         conn->ibc_nsends_posted--;
879         if (msg->ibm_type == IBLND_MSG_NOOP)
880                 conn->ibc_noops_posted--;
881
882         tx->tx_status = rc;
883         tx->tx_waiting = 0;
884         tx->tx_sending--;
885
886         done = (tx->tx_sending == 0);
887         if (done)
888                 list_del(&tx->tx_list);
889
890         spin_unlock(&conn->ibc_lock);
891
892         if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
893                 CERROR("Error %d posting transmit to %s\n",
894                        rc, libcfs_nid2str(peer->ibp_nid));
895         else
896                 CDEBUG(D_NET, "Error %d posting transmit to %s\n",
897                        rc, libcfs_nid2str(peer->ibp_nid));
898
899         kiblnd_close_conn(conn, rc);
900
901         if (done)
902                 kiblnd_tx_done(peer->ibp_ni, tx);
903
904         spin_lock(&conn->ibc_lock);
905
906         return -EIO;
907 }
908
909 static void
910 kiblnd_check_sends_locked(kib_conn_t *conn)
911 {
912         int        ver = conn->ibc_version;
913         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
914         kib_tx_t  *tx;
915
916         /* Don't send anything until after the connection is established */
917         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
918                 CDEBUG(D_NET, "%s too soon\n",
919                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
920                 return;
921         }
922
923         LASSERT(conn->ibc_nsends_posted <=
924                 kiblnd_concurrent_sends(ver, ni));
925         LASSERT (!IBLND_OOB_CAPABLE(ver) ||
926                  conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
927         LASSERT (conn->ibc_reserved_credits >= 0);
928
929         while (conn->ibc_reserved_credits > 0 &&
930                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
931                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
932                                     kib_tx_t, tx_list);
933                 list_del(&tx->tx_list);
934                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
935                 conn->ibc_reserved_credits--;
936         }
937
938         if (kiblnd_need_noop(conn)) {
939                 spin_unlock(&conn->ibc_lock);
940
941                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
942                 if (tx != NULL)
943                         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
944
945                 spin_lock(&conn->ibc_lock);
946                 if (tx != NULL)
947                         kiblnd_queue_tx_locked(tx, conn);
948         }
949
950         for (;;) {
951                 int credit;
952
953                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
954                         credit = 0;
955                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
956                                             kib_tx_t, tx_list);
957                 } else if (!list_empty(&conn->ibc_tx_noops)) {
958                         LASSERT (!IBLND_OOB_CAPABLE(ver));
959                         credit = 1;
960                         tx = list_entry(conn->ibc_tx_noops.next,
961                                         kib_tx_t, tx_list);
962                 } else if (!list_empty(&conn->ibc_tx_queue)) {
963                         credit = 1;
964                         tx = list_entry(conn->ibc_tx_queue.next,
965                                             kib_tx_t, tx_list);
966                 } else
967                         break;
968
969                 if (kiblnd_post_tx_locked(conn, tx, credit) != 0)
970                         break;
971         }
972 }
973
974 static void
975 kiblnd_tx_complete (kib_tx_t *tx, int status)
976 {
977         int           failed = (status != IB_WC_SUCCESS);
978         kib_conn_t   *conn = tx->tx_conn;
979         int           idle;
980
981         LASSERT (tx->tx_sending > 0);
982
983         if (failed) {
984                 if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
985                         CNETERR("Tx -> %s cookie %#llx"
986                                 " sending %d waiting %d: failed %d\n",
987                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
988                                 tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
989                                 status);
990
991                 kiblnd_close_conn(conn, -EIO);
992         } else {
993                 kiblnd_peer_alive(conn->ibc_peer);
994         }
995
996         spin_lock(&conn->ibc_lock);
997
998         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
999          * gets to free it, which also drops its ref on 'conn'. */
1000
1001         tx->tx_sending--;
1002         conn->ibc_nsends_posted--;
1003         if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1004                 conn->ibc_noops_posted--;
1005
1006         if (failed) {
1007                 tx->tx_waiting = 0;             /* don't wait for peer */
1008                 tx->tx_status = -EIO;
1009         }
1010
1011         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1012                !tx->tx_waiting &&               /* Not waiting for peer */
1013                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1014         if (idle)
1015                 list_del(&tx->tx_list);
1016
1017         kiblnd_check_sends_locked(conn);
1018         spin_unlock(&conn->ibc_lock);
1019
1020         if (idle)
1021                 kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1022 }
1023
1024 static void
1025 kiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob)
1026 {
1027         kib_hca_dev_t *hdev = tx->tx_pool->tpo_hdev;
1028         struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq];
1029         struct ib_rdma_wr *wrq;
1030         int nob = offsetof(kib_msg_t, ibm_u) + body_nob;
1031         struct ib_mr *mr = hdev->ibh_mrs;
1032
1033         LASSERT(tx->tx_nwrq >= 0);
1034         LASSERT(tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1035         LASSERT(nob <= IBLND_MSG_SIZE);
1036         LASSERT(mr != NULL);
1037
1038         kiblnd_init_msg(tx->tx_msg, type, body_nob);
1039
1040         sge->lkey   = mr->lkey;
1041         sge->addr   = tx->tx_msgaddr;
1042         sge->length = nob;
1043
1044         wrq = &tx->tx_wrq[tx->tx_nwrq];
1045         memset(wrq, 0, sizeof(*wrq));
1046
1047         wrq->wr.next            = NULL;
1048         wrq->wr.wr_id           = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1049         wrq->wr.sg_list         = sge;
1050         wrq->wr.num_sge         = 1;
1051         wrq->wr.opcode          = IB_WR_SEND;
1052         wrq->wr.send_flags      = IB_SEND_SIGNALED;
1053
1054         tx->tx_nwrq++;
1055 }
1056
1057 static int
1058 kiblnd_init_rdma(kib_conn_t *conn, kib_tx_t *tx, int type,
1059                  int resid, kib_rdma_desc_t *dstrd, __u64 dstcookie)
1060 {
1061         kib_msg_t         *ibmsg = tx->tx_msg;
1062         kib_rdma_desc_t   *srcrd = tx->tx_rd;
1063         struct ib_sge     *sge = &tx->tx_sge[0];
1064         struct ib_rdma_wr *wrq;
1065         int                rc  = resid;
1066         int                srcidx;
1067         int                dstidx;
1068         int                wrknob;
1069
1070         LASSERT (!in_interrupt());
1071         LASSERT (tx->tx_nwrq == 0);
1072         LASSERT (type == IBLND_MSG_GET_DONE ||
1073                  type == IBLND_MSG_PUT_DONE);
1074
1075         srcidx = dstidx = 0;
1076
1077         while (resid > 0) {
1078                 if (srcidx >= srcrd->rd_nfrags) {
1079                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1080                         rc = -EPROTO;
1081                         break;
1082                 }
1083
1084                 if (dstidx == dstrd->rd_nfrags) {
1085                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1086                         rc = -EPROTO;
1087                         break;
1088                 }
1089
1090                 if (tx->tx_nwrq >= conn->ibc_max_frags) {
1091                         CERROR("RDMA has too many fragments for peer %s (%d), "
1092                                "src idx/frags: %d/%d dst idx/frags: %d/%d\n",
1093                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1094                                conn->ibc_max_frags,
1095                                srcidx, srcrd->rd_nfrags,
1096                                dstidx, dstrd->rd_nfrags);
1097                         rc = -EMSGSIZE;
1098                         break;
1099                 }
1100
1101                 wrknob = MIN(MIN(kiblnd_rd_frag_size(srcrd, srcidx),
1102                                  kiblnd_rd_frag_size(dstrd, dstidx)), resid);
1103
1104                 sge = &tx->tx_sge[tx->tx_nwrq];
1105                 sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1106                 sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1107                 sge->length = wrknob;
1108
1109                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1110
1111                 wrq->wr.next            = &(wrq + 1)->wr;
1112                 wrq->wr.wr_id           = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1113                 wrq->wr.sg_list         = sge;
1114                 wrq->wr.num_sge         = 1;
1115                 wrq->wr.opcode          = IB_WR_RDMA_WRITE;
1116                 wrq->wr.send_flags      = 0;
1117
1118 #ifdef HAVE_IB_RDMA_WR
1119                 wrq->remote_addr        = kiblnd_rd_frag_addr(dstrd, dstidx);
1120                 wrq->rkey               = kiblnd_rd_frag_key(dstrd, dstidx);
1121 #else
1122                 wrq->wr.wr.rdma.remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1123                 wrq->wr.wr.rdma.rkey    = kiblnd_rd_frag_key(dstrd, dstidx);
1124 #endif
1125
1126                 srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1127                 dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1128
1129                 resid -= wrknob;
1130
1131                 tx->tx_nwrq++;
1132                 wrq++;
1133                 sge++;
1134         }
1135
1136         if (rc < 0)                             /* no RDMA if completing with failure */
1137                 tx->tx_nwrq = 0;
1138
1139         ibmsg->ibm_u.completion.ibcm_status = rc;
1140         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1141         kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1142                            type, sizeof (kib_completion_msg_t));
1143
1144         return rc;
1145 }
1146
1147 static void
1148 kiblnd_queue_tx_locked(kib_tx_t *tx, kib_conn_t *conn)
1149 {
1150         struct list_head *q;
1151
1152         LASSERT(tx->tx_nwrq > 0);       /* work items set up */
1153         LASSERT(!tx->tx_queued);        /* not queued for sending already */
1154         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1155
1156         tx->tx_queued = 1;
1157         tx->tx_deadline = jiffies +
1158                           msecs_to_jiffies(*kiblnd_tunables.kib_timeout *
1159                                            MSEC_PER_SEC);
1160
1161         if (tx->tx_conn == NULL) {
1162                 kiblnd_conn_addref(conn);
1163                 tx->tx_conn = conn;
1164                 LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1165         } else {
1166                 /* PUT_DONE first attached to conn as a PUT_REQ */
1167                 LASSERT (tx->tx_conn == conn);
1168                 LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1169         }
1170
1171         switch (tx->tx_msg->ibm_type) {
1172         default:
1173                 LBUG();
1174
1175         case IBLND_MSG_PUT_REQ:
1176         case IBLND_MSG_GET_REQ:
1177                 q = &conn->ibc_tx_queue_rsrvd;
1178                 break;
1179
1180         case IBLND_MSG_PUT_NAK:
1181         case IBLND_MSG_PUT_ACK:
1182         case IBLND_MSG_PUT_DONE:
1183         case IBLND_MSG_GET_DONE:
1184                 q = &conn->ibc_tx_queue_nocred;
1185                 break;
1186
1187         case IBLND_MSG_NOOP:
1188                 if (IBLND_OOB_CAPABLE(conn->ibc_version))
1189                         q = &conn->ibc_tx_queue_nocred;
1190                 else
1191                         q = &conn->ibc_tx_noops;
1192                 break;
1193
1194         case IBLND_MSG_IMMEDIATE:
1195                 q = &conn->ibc_tx_queue;
1196                 break;
1197         }
1198
1199         list_add_tail(&tx->tx_list, q);
1200 }
1201
1202 static void
1203 kiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1204 {
1205         spin_lock(&conn->ibc_lock);
1206         kiblnd_queue_tx_locked(tx, conn);
1207         kiblnd_check_sends_locked(conn);
1208         spin_unlock(&conn->ibc_lock);
1209 }
1210
1211 static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1212                                struct sockaddr_in *srcaddr,
1213                                struct sockaddr_in *dstaddr,
1214                                int timeout_ms)
1215 {
1216         unsigned short port;
1217         int rc;
1218
1219         /* allow the port to be reused */
1220         rc = rdma_set_reuseaddr(cmid, 1);
1221         if (rc != 0) {
1222                 CERROR("Unable to set reuse on cmid: %d\n", rc);
1223                 return rc;
1224         }
1225
1226         /* look for a free privileged port */
1227         for (port = PROT_SOCK-1; port > 0; port--) {
1228                 srcaddr->sin_port = htons(port);
1229                 rc = rdma_resolve_addr(cmid,
1230                                        (struct sockaddr *)srcaddr,
1231                                        (struct sockaddr *)dstaddr,
1232                                        timeout_ms);
1233                 if (rc == 0) {
1234                         CDEBUG(D_NET, "bound to port %hu\n", port);
1235                         return 0;
1236                 } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1237                         CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1238                                port, rc);
1239                 } else {
1240                         return rc;
1241                 }
1242         }
1243
1244         CERROR("Failed to bind to a free privileged port\n");
1245         return rc;
1246 }
1247
1248 static void
1249 kiblnd_connect_peer (kib_peer_t *peer)
1250 {
1251         struct rdma_cm_id *cmid;
1252         kib_dev_t         *dev;
1253         kib_net_t         *net = peer->ibp_ni->ni_data;
1254         struct sockaddr_in srcaddr;
1255         struct sockaddr_in dstaddr;
1256         int                rc;
1257
1258         LASSERT (net != NULL);
1259         LASSERT (peer->ibp_connecting > 0);
1260         LASSERT(!peer->ibp_reconnecting);
1261
1262         cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1263                                      IB_QPT_RC);
1264
1265         if (IS_ERR(cmid)) {
1266                 CERROR("Can't create CMID for %s: %ld\n",
1267                        libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1268                 rc = PTR_ERR(cmid);
1269                 goto failed;
1270         }
1271
1272         dev = net->ibn_dev;
1273         memset(&srcaddr, 0, sizeof(srcaddr));
1274         srcaddr.sin_family = AF_INET;
1275         srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1276
1277         memset(&dstaddr, 0, sizeof(dstaddr));
1278         dstaddr.sin_family = AF_INET;
1279         dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1280         dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1281
1282         kiblnd_peer_addref(peer);               /* cmid's ref */
1283
1284         if (*kiblnd_tunables.kib_use_priv_port) {
1285                 rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1286                                          *kiblnd_tunables.kib_timeout * 1000);
1287         } else {
1288                 rc = rdma_resolve_addr(cmid,
1289                                        (struct sockaddr *)&srcaddr,
1290                                        (struct sockaddr *)&dstaddr,
1291                                        *kiblnd_tunables.kib_timeout * 1000);
1292         }
1293         if (rc != 0) {
1294                 /* Can't initiate address resolution:  */
1295                 CERROR("Can't resolve addr for %s: %d\n",
1296                        libcfs_nid2str(peer->ibp_nid), rc);
1297                 goto failed2;
1298         }
1299
1300         LASSERT (cmid->device != NULL);
1301         CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1302                libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1303                &dev->ibd_ifip, cmid->device->name);
1304
1305         return;
1306
1307  failed2:
1308         kiblnd_peer_connect_failed(peer, 1, rc);
1309         kiblnd_peer_decref(peer);               /* cmid's ref */
1310         rdma_destroy_id(cmid);
1311         return;
1312  failed:
1313         kiblnd_peer_connect_failed(peer, 1, rc);
1314 }
1315
1316 bool
1317 kiblnd_reconnect_peer(kib_peer_t *peer)
1318 {
1319         rwlock_t         *glock = &kiblnd_data.kib_global_lock;
1320         char             *reason = NULL;
1321         struct list_head  txs;
1322         unsigned long     flags;
1323
1324         INIT_LIST_HEAD(&txs);
1325
1326         write_lock_irqsave(glock, flags);
1327         if (peer->ibp_reconnecting == 0) {
1328                 if (peer->ibp_accepting)
1329                         reason = "accepting";
1330                 else if (peer->ibp_connecting)
1331                         reason = "connecting";
1332                 else if (!list_empty(&peer->ibp_conns))
1333                         reason = "connected";
1334                 else /* connected then closed */
1335                         reason = "closed";
1336
1337                 goto no_reconnect;
1338         }
1339
1340         LASSERT(!peer->ibp_accepting && !peer->ibp_connecting &&
1341                 list_empty(&peer->ibp_conns));
1342         peer->ibp_reconnecting = 0;
1343
1344         if (!kiblnd_peer_active(peer)) {
1345                 list_splice_init(&peer->ibp_tx_queue, &txs);
1346                 reason = "unlinked";
1347                 goto no_reconnect;
1348         }
1349
1350         peer->ibp_connecting++;
1351         peer->ibp_reconnected++;
1352         write_unlock_irqrestore(glock, flags);
1353
1354         kiblnd_connect_peer(peer);
1355         return true;
1356
1357  no_reconnect:
1358         write_unlock_irqrestore(glock, flags);
1359
1360         CWARN("Abort reconnection of %s: %s\n",
1361               libcfs_nid2str(peer->ibp_nid), reason);
1362         kiblnd_txlist_done(peer->ibp_ni, &txs, -ECONNABORTED);
1363         return false;
1364 }
1365
1366 void
1367 kiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid)
1368 {
1369         kib_peer_t        *peer;
1370         kib_peer_t        *peer2;
1371         kib_conn_t        *conn;
1372         rwlock_t        *g_lock = &kiblnd_data.kib_global_lock;
1373         unsigned long      flags;
1374         int                rc;
1375
1376         /* If I get here, I've committed to send, so I complete the tx with
1377          * failure on any problems */
1378
1379         LASSERT (tx == NULL || tx->tx_conn == NULL); /* only set when assigned a conn */
1380         LASSERT (tx == NULL || tx->tx_nwrq > 0);     /* work items have been set up */
1381
1382         /* First time, just use a read lock since I expect to find my peer
1383          * connected */
1384         read_lock_irqsave(g_lock, flags);
1385
1386         peer = kiblnd_find_peer_locked(nid);
1387         if (peer != NULL && !list_empty(&peer->ibp_conns)) {
1388                 /* Found a peer with an established connection */
1389                 conn = kiblnd_get_conn_locked(peer);
1390                 kiblnd_conn_addref(conn); /* 1 ref for me... */
1391
1392                 read_unlock_irqrestore(g_lock, flags);
1393
1394                 if (tx != NULL)
1395                         kiblnd_queue_tx(tx, conn);
1396                 kiblnd_conn_decref(conn); /* ...to here */
1397                 return;
1398         }
1399
1400         read_unlock(g_lock);
1401         /* Re-try with a write lock */
1402         write_lock(g_lock);
1403
1404         peer = kiblnd_find_peer_locked(nid);
1405         if (peer != NULL) {
1406                 if (list_empty(&peer->ibp_conns)) {
1407                         /* found a peer, but it's still connecting... */
1408                         LASSERT(kiblnd_peer_connecting(peer));
1409                         if (tx != NULL)
1410                                 list_add_tail(&tx->tx_list,
1411                                                   &peer->ibp_tx_queue);
1412                         write_unlock_irqrestore(g_lock, flags);
1413                 } else {
1414                         conn = kiblnd_get_conn_locked(peer);
1415                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1416
1417                         write_unlock_irqrestore(g_lock, flags);
1418
1419                         if (tx != NULL)
1420                                 kiblnd_queue_tx(tx, conn);
1421                         kiblnd_conn_decref(conn); /* ...to here */
1422                 }
1423                 return;
1424         }
1425
1426         write_unlock_irqrestore(g_lock, flags);
1427
1428         /* Allocate a peer ready to add to the peer table and retry */
1429         rc = kiblnd_create_peer(ni, &peer, nid);
1430         if (rc != 0) {
1431                 CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1432                 if (tx != NULL) {
1433                         tx->tx_status = -EHOSTUNREACH;
1434                         tx->tx_waiting = 0;
1435                         kiblnd_tx_done(ni, tx);
1436                 }
1437                 return;
1438         }
1439
1440         write_lock_irqsave(g_lock, flags);
1441
1442         peer2 = kiblnd_find_peer_locked(nid);
1443         if (peer2 != NULL) {
1444                 if (list_empty(&peer2->ibp_conns)) {
1445                         /* found a peer, but it's still connecting... */
1446                         LASSERT(kiblnd_peer_connecting(peer2));
1447                         if (tx != NULL)
1448                                 list_add_tail(&tx->tx_list,
1449                                                   &peer2->ibp_tx_queue);
1450                         write_unlock_irqrestore(g_lock, flags);
1451                 } else {
1452                         conn = kiblnd_get_conn_locked(peer2);
1453                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1454
1455                         write_unlock_irqrestore(g_lock, flags);
1456
1457                         if (tx != NULL)
1458                                 kiblnd_queue_tx(tx, conn);
1459                         kiblnd_conn_decref(conn); /* ...to here */
1460                 }
1461
1462                 kiblnd_peer_decref(peer);
1463                 return;
1464         }
1465
1466         /* Brand new peer */
1467         LASSERT (peer->ibp_connecting == 0);
1468         peer->ibp_connecting = 1;
1469
1470         /* always called with a ref on ni, which prevents ni being shutdown */
1471         LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
1472
1473         if (tx != NULL)
1474                 list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1475
1476         kiblnd_peer_addref(peer);
1477         list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1478
1479         write_unlock_irqrestore(g_lock, flags);
1480
1481         kiblnd_connect_peer(peer);
1482         kiblnd_peer_decref(peer);
1483 }
1484
1485 int
1486 kiblnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1487 {
1488         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
1489         int               type = lntmsg->msg_type;
1490         lnet_process_id_t target = lntmsg->msg_target;
1491         int               target_is_router = lntmsg->msg_target_is_router;
1492         int               routing = lntmsg->msg_routing;
1493         unsigned int      payload_niov = lntmsg->msg_niov;
1494         struct kvec      *payload_iov = lntmsg->msg_iov;
1495         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1496         unsigned int      payload_offset = lntmsg->msg_offset;
1497         unsigned int      payload_nob = lntmsg->msg_len;
1498         kib_msg_t        *ibmsg;
1499         kib_rdma_desc_t  *rd;
1500         kib_tx_t         *tx;
1501         int               nob;
1502         int               rc;
1503
1504         /* NB 'private' is different depending on what we're sending.... */
1505
1506         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1507                payload_nob, payload_niov, libcfs_id2str(target));
1508
1509         LASSERT (payload_nob == 0 || payload_niov > 0);
1510         LASSERT (payload_niov <= LNET_MAX_IOV);
1511
1512         /* Thread context */
1513         LASSERT (!in_interrupt());
1514         /* payload is either all vaddrs or all pages */
1515         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1516
1517         switch (type) {
1518         default:
1519                 LBUG();
1520                 return (-EIO);
1521
1522         case LNET_MSG_ACK:
1523                 LASSERT (payload_nob == 0);
1524                 break;
1525
1526         case LNET_MSG_GET:
1527                 if (routing || target_is_router)
1528                         break;                  /* send IMMEDIATE */
1529
1530                 /* is the REPLY message too small for RDMA? */
1531                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1532                 if (nob <= IBLND_MSG_SIZE)
1533                         break;                  /* send IMMEDIATE */
1534
1535                 tx = kiblnd_get_idle_tx(ni, target.nid);
1536                 if (tx == NULL) {
1537                         CERROR("Can't allocate txd for GET to %s\n",
1538                                libcfs_nid2str(target.nid));
1539                         return -ENOMEM;
1540                 }
1541
1542                 ibmsg = tx->tx_msg;
1543                 rd = &ibmsg->ibm_u.get.ibgm_rd;
1544                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1545                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1546                                                  lntmsg->msg_md->md_niov,
1547                                                  lntmsg->msg_md->md_iov.iov,
1548                                                  0, lntmsg->msg_md->md_length);
1549                 else
1550                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1551                                                   lntmsg->msg_md->md_niov,
1552                                                   lntmsg->msg_md->md_iov.kiov,
1553                                                   0, lntmsg->msg_md->md_length);
1554                 if (rc != 0) {
1555                         CERROR("Can't setup GET sink for %s: %d\n",
1556                                libcfs_nid2str(target.nid), rc);
1557                         kiblnd_tx_done(ni, tx);
1558                         return -EIO;
1559                 }
1560
1561                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[rd->rd_nfrags]);
1562                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1563                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1564
1565                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1566
1567                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1568                 if (tx->tx_lntmsg[1] == NULL) {
1569                         CERROR("Can't create reply for GET -> %s\n",
1570                                libcfs_nid2str(target.nid));
1571                         kiblnd_tx_done(ni, tx);
1572                         return -EIO;
1573                 }
1574
1575                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1576                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1577                 kiblnd_launch_tx(ni, tx, target.nid);
1578                 return 0;
1579
1580         case LNET_MSG_REPLY:
1581         case LNET_MSG_PUT:
1582                 /* Is the payload small enough not to need RDMA? */
1583                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1584                 if (nob <= IBLND_MSG_SIZE)
1585                         break;                  /* send IMMEDIATE */
1586
1587                 tx = kiblnd_get_idle_tx(ni, target.nid);
1588                 if (tx == NULL) {
1589                         CERROR("Can't allocate %s txd for %s\n",
1590                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1591                                libcfs_nid2str(target.nid));
1592                         return -ENOMEM;
1593                 }
1594
1595                 if (payload_kiov == NULL)
1596                         rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1597                                                  payload_niov, payload_iov,
1598                                                  payload_offset, payload_nob);
1599                 else
1600                         rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1601                                                   payload_niov, payload_kiov,
1602                                                   payload_offset, payload_nob);
1603                 if (rc != 0) {
1604                         CERROR("Can't setup PUT src for %s: %d\n",
1605                                libcfs_nid2str(target.nid), rc);
1606                         kiblnd_tx_done(ni, tx);
1607                         return -EIO;
1608                 }
1609
1610                 ibmsg = tx->tx_msg;
1611                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1612                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1613                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1614
1615                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1616                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1617                 kiblnd_launch_tx(ni, tx, target.nid);
1618                 return 0;
1619         }
1620
1621         /* send IMMEDIATE */
1622
1623         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1624                  <= IBLND_MSG_SIZE);
1625
1626         tx = kiblnd_get_idle_tx(ni, target.nid);
1627         if (tx == NULL) {
1628                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1629                         type, libcfs_nid2str(target.nid));
1630                 return -ENOMEM;
1631         }
1632
1633         ibmsg = tx->tx_msg;
1634         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1635
1636         if (payload_kiov != NULL)
1637                 lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg,
1638                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1639                                     payload_niov, payload_kiov,
1640                                     payload_offset, payload_nob);
1641         else
1642                 lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg,
1643                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1644                                    payload_niov, payload_iov,
1645                                    payload_offset, payload_nob);
1646
1647         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1648         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1649
1650         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1651         kiblnd_launch_tx(ni, tx, target.nid);
1652         return 0;
1653 }
1654
1655 static void
1656 kiblnd_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1657 {
1658         lnet_process_id_t target = lntmsg->msg_target;
1659         unsigned int      niov = lntmsg->msg_niov;
1660         struct kvec      *iov = lntmsg->msg_iov;
1661         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1662         unsigned int      offset = lntmsg->msg_offset;
1663         unsigned int      nob = lntmsg->msg_len;
1664         kib_tx_t         *tx;
1665         int               rc;
1666
1667         tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1668         if (tx == NULL) {
1669                 CERROR("Can't get tx for REPLY to %s\n",
1670                        libcfs_nid2str(target.nid));
1671                 goto failed_0;
1672         }
1673
1674         if (nob == 0)
1675                 rc = 0;
1676         else if (kiov == NULL)
1677                 rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1678                                          niov, iov, offset, nob);
1679         else
1680                 rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1681                                           niov, kiov, offset, nob);
1682
1683         if (rc != 0) {
1684                 CERROR("Can't setup GET src for %s: %d\n",
1685                        libcfs_nid2str(target.nid), rc);
1686                 goto failed_1;
1687         }
1688
1689         rc = kiblnd_init_rdma(rx->rx_conn, tx,
1690                               IBLND_MSG_GET_DONE, nob,
1691                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1692                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1693         if (rc < 0) {
1694                 CERROR("Can't setup rdma for GET from %s: %d\n",
1695                        libcfs_nid2str(target.nid), rc);
1696                 goto failed_1;
1697         }
1698         
1699         if (nob == 0) {
1700                 /* No RDMA: local completion may happen now! */
1701                 lnet_finalize(ni, lntmsg, 0);
1702         } else {
1703                 /* RDMA: lnet_finalize(lntmsg) when it
1704                  * completes */
1705                 tx->tx_lntmsg[0] = lntmsg;
1706         }
1707
1708         kiblnd_queue_tx(tx, rx->rx_conn);
1709         return;
1710
1711  failed_1:
1712         kiblnd_tx_done(ni, tx);
1713  failed_0:
1714         lnet_finalize(ni, lntmsg, -EIO);
1715 }
1716
1717 int
1718 kiblnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1719             unsigned int niov, struct kvec *iov, lnet_kiov_t *kiov,
1720             unsigned int offset, unsigned int mlen, unsigned int rlen)
1721 {
1722         kib_rx_t    *rx = private;
1723         kib_msg_t   *rxmsg = rx->rx_msg;
1724         kib_conn_t  *conn = rx->rx_conn;
1725         kib_tx_t    *tx;
1726         int          nob;
1727         int          post_credit = IBLND_POSTRX_PEER_CREDIT;
1728         int          rc = 0;
1729
1730         LASSERT (mlen <= rlen);
1731         LASSERT (!in_interrupt());
1732         /* Either all pages or all vaddrs */
1733         LASSERT (!(kiov != NULL && iov != NULL));
1734
1735         switch (rxmsg->ibm_type) {
1736         default:
1737                 LBUG();
1738
1739         case IBLND_MSG_IMMEDIATE:
1740                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1741                 if (nob > rx->rx_nob) {
1742                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1743                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1744                                 nob, rx->rx_nob);
1745                         rc = -EPROTO;
1746                         break;
1747                 }
1748
1749                 if (kiov != NULL)
1750                         lnet_copy_flat2kiov(niov, kiov, offset,
1751                                             IBLND_MSG_SIZE, rxmsg,
1752                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1753                                             mlen);
1754                 else
1755                         lnet_copy_flat2iov(niov, iov, offset,
1756                                            IBLND_MSG_SIZE, rxmsg,
1757                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1758                                            mlen);
1759                 lnet_finalize (ni, lntmsg, 0);
1760                 break;
1761
1762         case IBLND_MSG_PUT_REQ: {
1763                 kib_msg_t       *txmsg;
1764                 kib_rdma_desc_t *rd;
1765
1766                 if (mlen == 0) {
1767                         lnet_finalize(ni, lntmsg, 0);
1768                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1769                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1770                         break;
1771                 }
1772
1773                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1774                 if (tx == NULL) {
1775                         CERROR("Can't allocate tx for %s\n",
1776                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1777                         /* Not replying will break the connection */
1778                         rc = -ENOMEM;
1779                         break;
1780                 }
1781
1782                 txmsg = tx->tx_msg;
1783                 rd = &txmsg->ibm_u.putack.ibpam_rd;
1784                 if (kiov == NULL)
1785                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1786                                                  niov, iov, offset, mlen);
1787                 else
1788                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1789                                                   niov, kiov, offset, mlen);
1790                 if (rc != 0) {
1791                         CERROR("Can't setup PUT sink for %s: %d\n",
1792                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1793                         kiblnd_tx_done(ni, tx);
1794                         /* tell peer it's over */
1795                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1796                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1797                         break;
1798                 }
1799
1800                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[rd->rd_nfrags]);
1801                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1802                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1803
1804                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1805
1806                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1807                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1808                 kiblnd_queue_tx(tx, conn);
1809
1810                 /* reposted buffer reserved for PUT_DONE */
1811                 post_credit = IBLND_POSTRX_NO_CREDIT;
1812                 break;
1813                 }
1814
1815         case IBLND_MSG_GET_REQ:
1816                 if (lntmsg != NULL) {
1817                         /* Optimized GET; RDMA lntmsg's payload */
1818                         kiblnd_reply(ni, rx, lntmsg);
1819                 } else {
1820                         /* GET didn't match anything */
1821                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1822                                                -ENODATA,
1823                                                rxmsg->ibm_u.get.ibgm_cookie);
1824                 }
1825                 break;
1826         }
1827
1828         kiblnd_post_rx(rx, post_credit);
1829         return rc;
1830 }
1831
1832 int
1833 kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1834 {
1835         struct task_struct *task = kthread_run(fn, arg, name);
1836
1837         if (IS_ERR(task))
1838                 return PTR_ERR(task);
1839
1840         atomic_inc(&kiblnd_data.kib_nthreads);
1841         return 0;
1842 }
1843
1844 static void
1845 kiblnd_thread_fini (void)
1846 {
1847         atomic_dec (&kiblnd_data.kib_nthreads);
1848 }
1849
1850 static void
1851 kiblnd_peer_alive (kib_peer_t *peer)
1852 {
1853         /* This is racy, but everyone's only writing cfs_time_current() */
1854         peer->ibp_last_alive = cfs_time_current();
1855         smp_mb();
1856 }
1857
1858 static void
1859 kiblnd_peer_notify (kib_peer_t *peer)
1860 {
1861         int           error = 0;
1862         cfs_time_t    last_alive = 0;
1863         unsigned long flags;
1864
1865         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1866
1867         if (kiblnd_peer_idle(peer) && peer->ibp_error != 0) {
1868                 error = peer->ibp_error;
1869                 peer->ibp_error = 0;
1870
1871                 last_alive = peer->ibp_last_alive;
1872         }
1873
1874         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1875
1876         if (error != 0)
1877                 lnet_notify(peer->ibp_ni,
1878                             peer->ibp_nid, 0, last_alive);
1879 }
1880
1881 void
1882 kiblnd_close_conn_locked (kib_conn_t *conn, int error)
1883 {
1884         /* This just does the immediate housekeeping.  'error' is zero for a
1885          * normal shutdown which can happen only after the connection has been
1886          * established.  If the connection is established, schedule the
1887          * connection to be finished off by the connd.  Otherwise the connd is
1888          * already dealing with it (either to set it up or tear it down).
1889          * Caller holds kib_global_lock exclusively in irq context */
1890         kib_peer_t       *peer = conn->ibc_peer;
1891         kib_dev_t        *dev;
1892         unsigned long     flags;
1893
1894         LASSERT (error != 0 || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1895
1896         if (error != 0 && conn->ibc_comms_error == 0)
1897                 conn->ibc_comms_error = error;
1898
1899         if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1900                 return; /* already being handled  */
1901
1902         if (error == 0 &&
1903             list_empty(&conn->ibc_tx_noops) &&
1904             list_empty(&conn->ibc_tx_queue) &&
1905             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1906             list_empty(&conn->ibc_tx_queue_nocred) &&
1907             list_empty(&conn->ibc_active_txs)) {
1908                 CDEBUG(D_NET, "closing conn to %s\n", 
1909                        libcfs_nid2str(peer->ibp_nid));
1910         } else {
1911                 CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1912                        libcfs_nid2str(peer->ibp_nid), error,
1913                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1914                        list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1915                        list_empty(&conn->ibc_tx_queue_rsrvd) ?
1916                                                 "" : "(sending_rsrvd)",
1917                        list_empty(&conn->ibc_tx_queue_nocred) ?
1918                                                  "" : "(sending_nocred)",
1919                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1920         }
1921
1922         dev = ((kib_net_t *)peer->ibp_ni->ni_data)->ibn_dev;
1923         list_del(&conn->ibc_list);
1924         /* connd (see below) takes over ibc_list's ref */
1925
1926         if (list_empty(&peer->ibp_conns) &&    /* no more conns */
1927             kiblnd_peer_active(peer)) {         /* still in peer table */
1928                 kiblnd_unlink_peer_locked(peer);
1929
1930                 /* set/clear error on last conn */
1931                 peer->ibp_error = conn->ibc_comms_error;
1932         }
1933
1934         kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1935
1936         if (error != 0 &&
1937             kiblnd_dev_can_failover(dev)) {
1938                 list_add_tail(&dev->ibd_fail_list,
1939                               &kiblnd_data.kib_failed_devs);
1940                 wake_up(&kiblnd_data.kib_failover_waitq);
1941         }
1942
1943         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1944
1945         list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1946         wake_up(&kiblnd_data.kib_connd_waitq);
1947
1948         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1949 }
1950
1951 void
1952 kiblnd_close_conn(kib_conn_t *conn, int error)
1953 {
1954         unsigned long flags;
1955
1956         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1957
1958         kiblnd_close_conn_locked(conn, error);
1959
1960         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1961 }
1962
1963 static void
1964 kiblnd_handle_early_rxs(kib_conn_t *conn)
1965 {
1966         unsigned long    flags;
1967         kib_rx_t        *rx;
1968
1969         LASSERT(!in_interrupt());
1970         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1971
1972         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1973         while (!list_empty(&conn->ibc_early_rxs)) {
1974                 rx = list_entry(conn->ibc_early_rxs.next,
1975                                     kib_rx_t, rx_list);
1976                 list_del(&rx->rx_list);
1977                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1978
1979                 kiblnd_handle_rx(rx);
1980
1981                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1982         }
1983         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1984 }
1985
1986 static void
1987 kiblnd_abort_txs(kib_conn_t *conn, struct list_head *txs)
1988 {
1989         struct list_head         zombies = LIST_HEAD_INIT(zombies);
1990         struct list_head        *tmp;
1991         struct list_head        *nxt;
1992         kib_tx_t                *tx;
1993
1994         spin_lock(&conn->ibc_lock);
1995
1996         list_for_each_safe(tmp, nxt, txs) {
1997                 tx = list_entry(tmp, kib_tx_t, tx_list);
1998
1999                 if (txs == &conn->ibc_active_txs) {
2000                         LASSERT(!tx->tx_queued);
2001                         LASSERT(tx->tx_waiting ||
2002                                 tx->tx_sending != 0);
2003                 } else {
2004                         LASSERT(tx->tx_queued);
2005                 }
2006
2007                 tx->tx_status = -ECONNABORTED;
2008                 tx->tx_waiting = 0;
2009
2010                 if (tx->tx_sending == 0) {
2011                         tx->tx_queued = 0;
2012                         list_del(&tx->tx_list);
2013                         list_add(&tx->tx_list, &zombies);
2014                 }
2015         }
2016
2017         spin_unlock(&conn->ibc_lock);
2018
2019         kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
2020 }
2021
2022 static void
2023 kiblnd_finalise_conn (kib_conn_t *conn)
2024 {
2025         LASSERT (!in_interrupt());
2026         LASSERT (conn->ibc_state > IBLND_CONN_INIT);
2027
2028         kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2029
2030         /* abort_receives moves QP state to IB_QPS_ERR.  This is only required
2031          * for connections that didn't get as far as being connected, because
2032          * rdma_disconnect() does this for free. */
2033         kiblnd_abort_receives(conn);
2034
2035         /* Complete all tx descs not waiting for sends to complete.
2036          * NB we should be safe from RDMA now that the QP has changed state */
2037
2038         kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2039         kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2040         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2041         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2042         kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2043
2044         kiblnd_handle_early_rxs(conn);
2045 }
2046
2047 static void
2048 kiblnd_peer_connect_failed(kib_peer_t *peer, int active, int error)
2049 {
2050         struct list_head zombies = LIST_HEAD_INIT(zombies);
2051         unsigned long   flags;
2052
2053         LASSERT (error != 0);
2054         LASSERT (!in_interrupt());
2055
2056         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2057
2058         if (active) {
2059                 LASSERT (peer->ibp_connecting > 0);
2060                 peer->ibp_connecting--;
2061         } else {
2062                 LASSERT (peer->ibp_accepting > 0);
2063                 peer->ibp_accepting--;
2064         }
2065
2066         if (kiblnd_peer_connecting(peer)) {
2067                 /* another connection attempt under way... */
2068                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2069                                         flags);
2070                 return;
2071         }
2072
2073         peer->ibp_reconnected = 0;
2074         if (list_empty(&peer->ibp_conns)) {
2075                 /* Take peer's blocked transmits to complete with error */
2076                 list_add(&zombies, &peer->ibp_tx_queue);
2077                 list_del_init(&peer->ibp_tx_queue);
2078
2079                 if (kiblnd_peer_active(peer))
2080                         kiblnd_unlink_peer_locked(peer);
2081
2082                 peer->ibp_error = error;
2083         } else {
2084                 /* Can't have blocked transmits if there are connections */
2085                 LASSERT(list_empty(&peer->ibp_tx_queue));
2086         }
2087
2088         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2089
2090         kiblnd_peer_notify(peer);
2091
2092         if (list_empty(&zombies))
2093                 return;
2094
2095         CNETERR("Deleting messages for %s: connection failed\n",
2096                 libcfs_nid2str(peer->ibp_nid));
2097
2098         kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2099 }
2100
2101 static void
2102 kiblnd_connreq_done(kib_conn_t *conn, int status)
2103 {
2104         kib_peer_t       *peer = conn->ibc_peer;
2105         kib_tx_t         *tx;
2106         struct list_head txs;
2107         unsigned long    flags;
2108         int              active;
2109
2110         active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2111
2112         CDEBUG(D_NET,"%s: active(%d), version(%x), status(%d)\n",
2113                libcfs_nid2str(peer->ibp_nid), active,
2114                conn->ibc_version, status);
2115
2116         LASSERT (!in_interrupt());
2117         LASSERT ((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2118                   peer->ibp_connecting > 0) ||
2119                  (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2120                   peer->ibp_accepting > 0));
2121
2122         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2123         conn->ibc_connvars = NULL;
2124
2125         if (status != 0) {
2126                 /* failed to establish connection */
2127                 kiblnd_peer_connect_failed(peer, active, status);
2128                 kiblnd_finalise_conn(conn);
2129                 return;
2130         }
2131
2132         /* connection established */
2133         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2134
2135         conn->ibc_last_send = jiffies;
2136         kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2137         kiblnd_peer_alive(peer);
2138
2139         /* Add conn to peer's list and nuke any dangling conns from a different
2140          * peer instance... */
2141         kiblnd_conn_addref(conn);       /* +1 ref for ibc_list */
2142         list_add(&conn->ibc_list, &peer->ibp_conns);
2143         peer->ibp_reconnected = 0;
2144         if (active)
2145                 peer->ibp_connecting--;
2146         else
2147                 peer->ibp_accepting--;
2148
2149         if (peer->ibp_version == 0) {
2150                 peer->ibp_version     = conn->ibc_version;
2151                 peer->ibp_incarnation = conn->ibc_incarnation;
2152         }
2153
2154         if (peer->ibp_version     != conn->ibc_version ||
2155             peer->ibp_incarnation != conn->ibc_incarnation) {
2156                 kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2157                                                 conn->ibc_incarnation);
2158                 peer->ibp_version     = conn->ibc_version;
2159                 peer->ibp_incarnation = conn->ibc_incarnation;
2160         }
2161
2162         /* grab pending txs while I have the lock */
2163         list_add(&txs, &peer->ibp_tx_queue);
2164         list_del_init(&peer->ibp_tx_queue);
2165
2166         if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2167             conn->ibc_comms_error != 0) {       /* error has happened already */
2168                 lnet_ni_t *ni = peer->ibp_ni;
2169
2170                 /* start to shut down connection */
2171                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
2172                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2173
2174                 kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2175
2176                 return;
2177         }
2178
2179         /* +1 ref for myself, this connection is visible to other threads
2180          * now, refcount of peer:ibp_conns can be released by connection
2181          * close from either a different thread, or the calling of
2182          * kiblnd_check_sends_locked() below. See bz21911 for details.
2183          */
2184         kiblnd_conn_addref(conn);
2185         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2186
2187         /* Schedule blocked txs */
2188         spin_lock(&conn->ibc_lock);
2189         while (!list_empty(&txs)) {
2190                 tx = list_entry(txs.next, kib_tx_t, tx_list);
2191                 list_del(&tx->tx_list);
2192
2193                 kiblnd_queue_tx_locked(tx, conn);
2194         }
2195         kiblnd_check_sends_locked(conn);
2196         spin_unlock(&conn->ibc_lock);
2197
2198         /* schedule blocked rxs */
2199         kiblnd_handle_early_rxs(conn);
2200         kiblnd_conn_decref(conn);
2201 }
2202
2203 static void
2204 kiblnd_reject(struct rdma_cm_id *cmid, kib_rej_t *rej)
2205 {
2206         int          rc;
2207
2208         rc = rdma_reject(cmid, rej, sizeof(*rej));
2209
2210         if (rc != 0)
2211                 CWARN("Error %d sending reject\n", rc);
2212 }
2213
2214 static int
2215 kiblnd_passive_connect(struct rdma_cm_id *cmid, void *priv, int priv_nob)
2216 {
2217         rwlock_t                *g_lock = &kiblnd_data.kib_global_lock;
2218         kib_msg_t             *reqmsg = priv;
2219         kib_msg_t             *ackmsg;
2220         kib_dev_t             *ibdev;
2221         kib_peer_t            *peer;
2222         kib_peer_t            *peer2;
2223         kib_conn_t            *conn;
2224         lnet_ni_t             *ni  = NULL;
2225         kib_net_t             *net = NULL;
2226         lnet_nid_t             nid;
2227         struct rdma_conn_param cp;
2228         kib_rej_t              rej;
2229         int                    version = IBLND_MSG_VERSION;
2230         unsigned long          flags;
2231         int                    rc;
2232         struct sockaddr_in    *peer_addr;
2233         LASSERT (!in_interrupt());
2234
2235         /* cmid inherits 'context' from the corresponding listener id */
2236         ibdev = (kib_dev_t *)cmid->context;
2237         LASSERT (ibdev != NULL);
2238
2239         memset(&rej, 0, sizeof(rej));
2240         rej.ibr_magic                = IBLND_MSG_MAGIC;
2241         rej.ibr_why                  = IBLND_REJECT_FATAL;
2242         rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2243
2244         peer_addr = (struct sockaddr_in *)&(cmid->route.addr.dst_addr);
2245         if (*kiblnd_tunables.kib_require_priv_port &&
2246             ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2247                 __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2248                 CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2249                        &ip, ntohs(peer_addr->sin_port));
2250                 goto failed;
2251         }
2252
2253         if (priv_nob < offsetof(kib_msg_t, ibm_type)) {
2254                 CERROR("Short connection request\n");
2255                 goto failed;
2256         }
2257
2258         /* Future protocol version compatibility support!  If the
2259          * o2iblnd-specific protocol changes, or when LNET unifies
2260          * protocols over all LNDs, the initial connection will
2261          * negotiate a protocol version.  I trap this here to avoid
2262          * console errors; the reject tells the peer which protocol I
2263          * speak. */
2264         if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2265             reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2266                 goto failed;
2267         if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2268             reqmsg->ibm_version != IBLND_MSG_VERSION &&
2269             reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2270                 goto failed;
2271         if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2272             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2273             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2274                 goto failed;
2275
2276         rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2277         if (rc != 0) {
2278                 CERROR("Can't parse connection request: %d\n", rc);
2279                 goto failed;
2280         }
2281
2282         nid = reqmsg->ibm_srcnid;
2283         ni  = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2284
2285         if (ni != NULL) {
2286                 net = (kib_net_t *)ni->ni_data;
2287                 rej.ibr_incarnation = net->ibn_incarnation;
2288         }
2289
2290         if (ni == NULL ||                         /* no matching net */
2291             ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2292             net->ibn_dev != ibdev) {              /* wrong device */
2293                 CERROR("Can't accept conn from %s on %s (%s:%d:%pI4h): "
2294                        "bad dst nid %s\n", libcfs_nid2str(nid),
2295                        ni == NULL ? "NA" : libcfs_nid2str(ni->ni_nid),
2296                        ibdev->ibd_ifname, ibdev->ibd_nnets,
2297                         &ibdev->ibd_ifip,
2298                        libcfs_nid2str(reqmsg->ibm_dstnid));
2299
2300                 goto failed;
2301         }
2302
2303        /* check time stamp as soon as possible */
2304         if (reqmsg->ibm_dststamp != 0 &&
2305             reqmsg->ibm_dststamp != net->ibn_incarnation) {
2306                 CWARN("Stale connection request\n");
2307                 rej.ibr_why = IBLND_REJECT_CONN_STALE;
2308                 goto failed;
2309         }
2310
2311         /* I can accept peer's version */
2312         version = reqmsg->ibm_version;
2313
2314         if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2315                 CERROR("Unexpected connreq msg type: %x from %s\n",
2316                        reqmsg->ibm_type, libcfs_nid2str(nid));
2317                 goto failed;
2318         }
2319
2320         if (reqmsg->ibm_u.connparams.ibcp_queue_depth >
2321             kiblnd_msg_queue_size(version, ni)) {
2322                 CERROR("Can't accept conn from %s, queue depth too large: "
2323                        " %d (<=%d wanted)\n",
2324                        libcfs_nid2str(nid),
2325                        reqmsg->ibm_u.connparams.ibcp_queue_depth,
2326                        kiblnd_msg_queue_size(version, ni));
2327
2328                 if (version == IBLND_MSG_VERSION)
2329                         rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2330
2331                 goto failed;
2332         }
2333
2334         if (reqmsg->ibm_u.connparams.ibcp_max_frags >
2335             kiblnd_rdma_frags(version, ni)) {
2336                 CWARN("Can't accept conn from %s (version %x): "
2337                       "max_frags %d too large (%d wanted)\n",
2338                       libcfs_nid2str(nid), version,
2339                       reqmsg->ibm_u.connparams.ibcp_max_frags,
2340                       kiblnd_rdma_frags(version, ni));
2341
2342                 if (version >= IBLND_MSG_VERSION)
2343                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2344
2345                 goto failed;
2346         } else if (reqmsg->ibm_u.connparams.ibcp_max_frags <
2347                    kiblnd_rdma_frags(version, ni) &&
2348                    net->ibn_fmr_ps == NULL) {
2349                 CWARN("Can't accept conn from %s (version %x): "
2350                       "max_frags %d incompatible without FMR pool "
2351                       "(%d wanted)\n",
2352                       libcfs_nid2str(nid), version,
2353                       reqmsg->ibm_u.connparams.ibcp_max_frags,
2354                       kiblnd_rdma_frags(version, ni));
2355
2356                 if (version == IBLND_MSG_VERSION)
2357                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2358
2359                 goto failed;
2360         }
2361
2362         if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2363                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2364                        libcfs_nid2str(nid),
2365                        reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2366                        IBLND_MSG_SIZE);
2367                 goto failed;
2368         }
2369
2370         /* assume 'nid' is a new peer; create  */
2371         rc = kiblnd_create_peer(ni, &peer, nid);
2372         if (rc != 0) {
2373                 CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2374                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2375                 goto failed;
2376         }
2377
2378         /* We have validated the peer's parameters so use those */
2379         peer->ibp_max_frags = reqmsg->ibm_u.connparams.ibcp_max_frags;
2380         peer->ibp_queue_depth = reqmsg->ibm_u.connparams.ibcp_queue_depth;
2381
2382         write_lock_irqsave(g_lock, flags);
2383
2384         peer2 = kiblnd_find_peer_locked(nid);
2385         if (peer2 != NULL) {
2386                 if (peer2->ibp_version == 0) {
2387                         peer2->ibp_version     = version;
2388                         peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2389                 }
2390
2391                 /* not the guy I've talked with */
2392                 if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2393                     peer2->ibp_version     != version) {
2394                         kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2395
2396                         if (kiblnd_peer_active(peer2)) {
2397                                 peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2398                                 peer2->ibp_version = version;
2399                         }
2400                         write_unlock_irqrestore(g_lock, flags);
2401
2402                         CWARN("Conn stale %s version %x/%x incarnation %llu/%llu\n",
2403                               libcfs_nid2str(nid), peer2->ibp_version, version,
2404                               peer2->ibp_incarnation, reqmsg->ibm_srcstamp);
2405
2406                         kiblnd_peer_decref(peer);
2407                         rej.ibr_why = IBLND_REJECT_CONN_STALE;
2408                         goto failed;
2409                 }
2410
2411                 /* Tie-break connection race in favour of the higher NID.
2412                  * If we keep running into a race condition multiple times,
2413                  * we have to assume that the connection attempt with the
2414                  * higher NID is stuck in a connecting state and will never
2415                  * recover.  As such, we pass through this if-block and let
2416                  * the lower NID connection win so we can move forward.
2417                  */
2418                 if (peer2->ibp_connecting != 0 &&
2419                     nid < ni->ni_nid && peer2->ibp_races <
2420                     MAX_CONN_RACES_BEFORE_ABORT) {
2421                         peer2->ibp_races++;
2422                         write_unlock_irqrestore(g_lock, flags);
2423
2424                         CDEBUG(D_NET, "Conn race %s\n",
2425                                libcfs_nid2str(peer2->ibp_nid));
2426
2427                         kiblnd_peer_decref(peer);
2428                         rej.ibr_why = IBLND_REJECT_CONN_RACE;
2429                         goto failed;
2430                 }
2431                 if (peer2->ibp_races >= MAX_CONN_RACES_BEFORE_ABORT)
2432                         CNETERR("Conn race %s: unresolved after %d attempts, letting lower NID win\n",
2433                                 libcfs_nid2str(peer2->ibp_nid),
2434                                 MAX_CONN_RACES_BEFORE_ABORT);
2435                 /*
2436                  * passive connection is allowed even this peer is waiting for
2437                  * reconnection.
2438                  */
2439                 peer2->ibp_reconnecting = 0;
2440                 peer2->ibp_races = 0;
2441                 peer2->ibp_accepting++;
2442                 kiblnd_peer_addref(peer2);
2443
2444                 /* Race with kiblnd_launch_tx (active connect) to create peer
2445                  * so copy validated parameters since we now know what the
2446                  * peer's limits are */
2447                 peer2->ibp_max_frags = peer->ibp_max_frags;
2448                 peer2->ibp_queue_depth = peer->ibp_queue_depth;
2449
2450                 write_unlock_irqrestore(g_lock, flags);
2451                 kiblnd_peer_decref(peer);
2452                 peer = peer2;
2453         } else {
2454                 /* Brand new peer */
2455                 LASSERT (peer->ibp_accepting == 0);
2456                 LASSERT (peer->ibp_version == 0 &&
2457                          peer->ibp_incarnation == 0);
2458
2459                 peer->ibp_accepting   = 1;
2460                 peer->ibp_version     = version;
2461                 peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2462
2463                 /* I have a ref on ni that prevents it being shutdown */
2464                 LASSERT (net->ibn_shutdown == 0);
2465
2466                 kiblnd_peer_addref(peer);
2467                 list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2468
2469                 write_unlock_irqrestore(g_lock, flags);
2470         }
2471
2472         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT, version);
2473         if (conn == NULL) {
2474                 kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2475                 kiblnd_peer_decref(peer);
2476                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2477                 goto failed;
2478         }
2479
2480         /* conn now "owns" cmid, so I return success from here on to ensure the
2481          * CM callback doesn't destroy cmid. */
2482         conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2483         conn->ibc_credits          = conn->ibc_queue_depth;
2484         conn->ibc_reserved_credits = conn->ibc_queue_depth;
2485         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2486                 IBLND_OOB_MSGS(version) <= IBLND_RX_MSGS(conn));
2487
2488         ackmsg = &conn->ibc_connvars->cv_msg;
2489         memset(ackmsg, 0, sizeof(*ackmsg));
2490
2491         kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2492                         sizeof(ackmsg->ibm_u.connparams));
2493         ackmsg->ibm_u.connparams.ibcp_queue_depth  = conn->ibc_queue_depth;
2494         ackmsg->ibm_u.connparams.ibcp_max_frags    = conn->ibc_max_frags;
2495         ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2496
2497         kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2498
2499         memset(&cp, 0, sizeof(cp));
2500         cp.private_data        = ackmsg;
2501         cp.private_data_len    = ackmsg->ibm_nob;
2502         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2503         cp.initiator_depth     = 0;
2504         cp.flow_control        = 1;
2505         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2506         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2507
2508         CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2509
2510         rc = rdma_accept(cmid, &cp);
2511         if (rc != 0) {
2512                 CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2513                 rej.ibr_version = version;
2514                 rej.ibr_why     = IBLND_REJECT_FATAL;
2515
2516                 kiblnd_reject(cmid, &rej);
2517                 kiblnd_connreq_done(conn, rc);
2518                 kiblnd_conn_decref(conn);
2519         }
2520
2521         lnet_ni_decref(ni);
2522         return 0;
2523
2524  failed:
2525         if (ni != NULL) {
2526                 rej.ibr_cp.ibcp_queue_depth =
2527                         kiblnd_msg_queue_size(version, ni);
2528                 rej.ibr_cp.ibcp_max_frags   = kiblnd_rdma_frags(version, ni);
2529                 lnet_ni_decref(ni);
2530         }
2531
2532         rej.ibr_version = version;
2533         kiblnd_reject(cmid, &rej);
2534
2535         return -ECONNREFUSED;
2536 }
2537
2538 static void
2539 kiblnd_check_reconnect(kib_conn_t *conn, int version,
2540                        __u64 incarnation, int why, kib_connparams_t *cp)
2541 {
2542         rwlock_t        *glock = &kiblnd_data.kib_global_lock;
2543         kib_peer_t      *peer = conn->ibc_peer;
2544         char            *reason;
2545         int              msg_size = IBLND_MSG_SIZE;
2546         int              frag_num = -1;
2547         int              queue_dep = -1;
2548         bool             reconnect;
2549         unsigned long    flags;
2550
2551         LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2552         LASSERT(peer->ibp_connecting > 0);      /* 'conn' at least */
2553         LASSERT(!peer->ibp_reconnecting);
2554
2555         if (cp) {
2556                 msg_size        = cp->ibcp_max_msg_size;
2557                 frag_num        = cp->ibcp_max_frags;
2558                 queue_dep       = cp->ibcp_queue_depth;
2559         }
2560
2561         write_lock_irqsave(glock, flags);
2562         /* retry connection if it's still needed and no other connection
2563          * attempts (active or passive) are in progress
2564          * NB: reconnect is still needed even when ibp_tx_queue is
2565          * empty if ibp_version != version because reconnect may be
2566          * initiated by kiblnd_query() */
2567         reconnect = (!list_empty(&peer->ibp_tx_queue) ||
2568                      peer->ibp_version != version) &&
2569                     peer->ibp_connecting == 1 &&
2570                     peer->ibp_accepting == 0;
2571         if (!reconnect) {
2572                 reason = "no need";
2573                 goto out;
2574         }
2575
2576         switch (why) {
2577         default:
2578                 reason = "Unknown";
2579                 break;
2580
2581         case IBLND_REJECT_RDMA_FRAGS: {
2582                 struct lnet_ioctl_config_lnd_tunables *tunables;
2583
2584                 if (!cp) {
2585                         reason = "can't negotiate max frags";
2586                         goto out;
2587                 }
2588                 tunables = peer->ibp_ni->ni_lnd_tunables;
2589                 if (!tunables->lt_tun_u.lt_o2ib.lnd_map_on_demand) {
2590                         reason = "map_on_demand must be enabled";
2591                         goto out;
2592                 }
2593                 if (conn->ibc_max_frags <= frag_num) {
2594                         reason = "unsupported max frags";
2595                         goto out;
2596                 }
2597
2598                 peer->ibp_max_frags = frag_num;
2599                 reason = "rdma fragments";
2600                 break;
2601         }
2602         case IBLND_REJECT_MSG_QUEUE_SIZE:
2603                 if (!cp) {
2604                         reason = "can't negotiate queue depth";
2605                         goto out;
2606                 }
2607                 if (conn->ibc_queue_depth <= queue_dep) {
2608                         reason = "unsupported queue depth";
2609                         goto out;
2610                 }
2611
2612                 peer->ibp_queue_depth = queue_dep;
2613                 reason = "queue depth";
2614                 break;
2615
2616         case IBLND_REJECT_CONN_STALE:
2617                 reason = "stale";
2618                 break;
2619
2620         case IBLND_REJECT_CONN_RACE:
2621                 reason = "conn race";
2622                 break;
2623
2624         case IBLND_REJECT_CONN_UNCOMPAT:
2625                 reason = "version negotiation";
2626                 break;
2627         }
2628
2629         conn->ibc_reconnect = 1;
2630         peer->ibp_reconnecting = 1;
2631         peer->ibp_version = version;
2632         if (incarnation != 0)
2633                 peer->ibp_incarnation = incarnation;
2634  out:
2635         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2636
2637         CNETERR("%s: %s (%s), %x, %x, msg_size: %d, queue_depth: %d/%d, max_frags: %d/%d\n",
2638                 libcfs_nid2str(peer->ibp_nid),
2639                 reconnect ? "reconnect" : "don't reconnect",
2640                 reason, IBLND_MSG_VERSION, version, msg_size,
2641                 conn->ibc_queue_depth, queue_dep,
2642                 conn->ibc_max_frags, frag_num);
2643         /*
2644          * if conn::ibc_reconnect is TRUE, connd will reconnect to the peer
2645          * while destroying the zombie
2646          */
2647 }
2648
2649 static void
2650 kiblnd_rejected (kib_conn_t *conn, int reason, void *priv, int priv_nob)
2651 {
2652         kib_peer_t    *peer = conn->ibc_peer;
2653
2654         LASSERT (!in_interrupt());
2655         LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2656
2657         switch (reason) {
2658         case IB_CM_REJ_STALE_CONN:
2659                 kiblnd_check_reconnect(conn, IBLND_MSG_VERSION, 0,
2660                                        IBLND_REJECT_CONN_STALE, NULL);
2661                 break;
2662
2663         case IB_CM_REJ_INVALID_SERVICE_ID:
2664                 CNETERR("%s rejected: no listener at %d\n",
2665                         libcfs_nid2str(peer->ibp_nid),
2666                         *kiblnd_tunables.kib_service);
2667                 break;
2668
2669         case IB_CM_REJ_CONSUMER_DEFINED:
2670                 if (priv_nob >= offsetof(kib_rej_t, ibr_padding)) {
2671                         kib_rej_t        *rej         = priv;
2672                         kib_connparams_t *cp          = NULL;
2673                         int               flip        = 0;
2674                         __u64             incarnation = -1;
2675
2676                         /* NB. default incarnation is -1 because:
2677                          * a) V1 will ignore dst incarnation in connreq.
2678                          * b) V2 will provide incarnation while rejecting me,
2679                          *    -1 will be overwrote.
2680                          *
2681                          * if I try to connect to a V1 peer with V2 protocol,
2682                          * it rejected me then upgrade to V2, I have no idea
2683                          * about the upgrading and try to reconnect with V1,
2684                          * in this case upgraded V2 can find out I'm trying to
2685                          * talk to the old guy and reject me(incarnation is -1). 
2686                          */
2687
2688                         if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2689                             rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2690                                 __swab32s(&rej->ibr_magic);
2691                                 __swab16s(&rej->ibr_version);
2692                                 flip = 1;
2693                         }
2694
2695                         if (priv_nob >= sizeof(kib_rej_t) &&
2696                             rej->ibr_version > IBLND_MSG_VERSION_1) {
2697                                 /* priv_nob is always 148 in current version
2698                                  * of OFED, so we still need to check version.
2699                                  * (define of IB_CM_REJ_PRIVATE_DATA_SIZE) */
2700                                 cp = &rej->ibr_cp;
2701
2702                                 if (flip) {
2703                                         __swab64s(&rej->ibr_incarnation);
2704                                         __swab16s(&cp->ibcp_queue_depth);
2705                                         __swab16s(&cp->ibcp_max_frags);
2706                                         __swab32s(&cp->ibcp_max_msg_size);
2707                                 }
2708
2709                                 incarnation = rej->ibr_incarnation;
2710                         }
2711
2712                         if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2713                             rej->ibr_magic != LNET_PROTO_MAGIC) {
2714                                 CERROR("%s rejected: consumer defined fatal error\n",
2715                                        libcfs_nid2str(peer->ibp_nid));
2716                                 break;
2717                         }
2718
2719                         if (rej->ibr_version != IBLND_MSG_VERSION &&
2720                             rej->ibr_version != IBLND_MSG_VERSION_1) {
2721                                 CERROR("%s rejected: o2iblnd version %x error\n",
2722                                        libcfs_nid2str(peer->ibp_nid),
2723                                        rej->ibr_version);
2724                                 break;
2725                         }
2726
2727                         if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2728                             rej->ibr_version == IBLND_MSG_VERSION_1) {
2729                                 CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2730                                        libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2731
2732                                 if (conn->ibc_version != IBLND_MSG_VERSION_1)
2733                                         rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2734                         }
2735
2736                         switch (rej->ibr_why) {
2737                         case IBLND_REJECT_CONN_RACE:
2738                         case IBLND_REJECT_CONN_STALE:
2739                         case IBLND_REJECT_CONN_UNCOMPAT:
2740                         case IBLND_REJECT_MSG_QUEUE_SIZE:
2741                         case IBLND_REJECT_RDMA_FRAGS:
2742                                 kiblnd_check_reconnect(conn, rej->ibr_version,
2743                                                 incarnation, rej->ibr_why, cp);
2744                                 break;
2745
2746                         case IBLND_REJECT_NO_RESOURCES:
2747                                 CERROR("%s rejected: o2iblnd no resources\n",
2748                                        libcfs_nid2str(peer->ibp_nid));
2749                                 break;
2750
2751                         case IBLND_REJECT_FATAL:
2752                                 CERROR("%s rejected: o2iblnd fatal error\n",
2753                                        libcfs_nid2str(peer->ibp_nid));
2754                                 break;
2755
2756                         default:
2757                                 CERROR("%s rejected: o2iblnd reason %d\n",
2758                                        libcfs_nid2str(peer->ibp_nid),
2759                                        rej->ibr_why);
2760                                 break;
2761                         }
2762                         break;
2763                 }
2764                 /* fall through */
2765         default:
2766                 CNETERR("%s rejected: reason %d, size %d\n",
2767                         libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2768                 break;
2769         }
2770
2771         kiblnd_connreq_done(conn, -ECONNREFUSED);
2772 }
2773
2774 static void
2775 kiblnd_check_connreply (kib_conn_t *conn, void *priv, int priv_nob)
2776 {
2777         kib_peer_t    *peer = conn->ibc_peer;
2778         lnet_ni_t     *ni   = peer->ibp_ni;
2779         kib_net_t     *net  = ni->ni_data;
2780         kib_msg_t     *msg  = priv;
2781         int            ver  = conn->ibc_version;
2782         int            rc   = kiblnd_unpack_msg(msg, priv_nob);
2783         unsigned long  flags;
2784
2785         LASSERT (net != NULL);
2786
2787         if (rc != 0) {
2788                 CERROR("Can't unpack connack from %s: %d\n",
2789                        libcfs_nid2str(peer->ibp_nid), rc);
2790                 goto failed;
2791         }
2792
2793         if (msg->ibm_type != IBLND_MSG_CONNACK) {
2794                 CERROR("Unexpected message %d from %s\n",
2795                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2796                 rc = -EPROTO;
2797                 goto failed;
2798         }
2799
2800         if (ver != msg->ibm_version) {
2801                 CERROR("%s replied version %x is different with "
2802                        "requested version %x\n",
2803                        libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2804                 rc = -EPROTO;
2805                 goto failed;
2806         }
2807
2808         if (msg->ibm_u.connparams.ibcp_queue_depth >
2809             conn->ibc_queue_depth) {
2810                 CERROR("%s has incompatible queue depth %d (<=%d wanted)\n",
2811                        libcfs_nid2str(peer->ibp_nid),
2812                        msg->ibm_u.connparams.ibcp_queue_depth,
2813                        conn->ibc_queue_depth);
2814                 rc = -EPROTO;
2815                 goto failed;
2816         }
2817
2818         if (msg->ibm_u.connparams.ibcp_max_frags >
2819             conn->ibc_max_frags) {
2820                 CERROR("%s has incompatible max_frags %d (<=%d wanted)\n",
2821                        libcfs_nid2str(peer->ibp_nid),
2822                        msg->ibm_u.connparams.ibcp_max_frags,
2823                        conn->ibc_max_frags);
2824                 rc = -EPROTO;
2825                 goto failed;
2826         }
2827
2828         if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2829                 CERROR("%s max message size %d too big (%d max)\n",
2830                        libcfs_nid2str(peer->ibp_nid),
2831                        msg->ibm_u.connparams.ibcp_max_msg_size,
2832                        IBLND_MSG_SIZE);
2833                 rc = -EPROTO;
2834                 goto failed;
2835         }
2836
2837         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2838         if (msg->ibm_dstnid == ni->ni_nid &&
2839             msg->ibm_dststamp == net->ibn_incarnation)
2840                 rc = 0;
2841         else
2842                 rc = -ESTALE;
2843         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2844
2845         if (rc != 0) {
2846                 CERROR("Bad connection reply from %s, rc = %d, "
2847                        "version: %x max_frags: %d\n",
2848                        libcfs_nid2str(peer->ibp_nid), rc,
2849                        msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2850                 goto failed;
2851         }
2852
2853         conn->ibc_incarnation      = msg->ibm_srcstamp;
2854         conn->ibc_credits          = msg->ibm_u.connparams.ibcp_queue_depth;
2855         conn->ibc_reserved_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2856         conn->ibc_queue_depth      = msg->ibm_u.connparams.ibcp_queue_depth;
2857         conn->ibc_max_frags        = msg->ibm_u.connparams.ibcp_max_frags;
2858         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2859                 IBLND_OOB_MSGS(ver) <= IBLND_RX_MSGS(conn));
2860
2861         kiblnd_connreq_done(conn, 0);
2862         return;
2863
2864  failed:
2865         /* NB My QP has already established itself, so I handle anything going
2866          * wrong here by setting ibc_comms_error.
2867          * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2868          * immediately tears it down. */
2869
2870         LASSERT (rc != 0);
2871         conn->ibc_comms_error = rc;
2872         kiblnd_connreq_done(conn, 0);
2873 }
2874
2875 static int
2876 kiblnd_active_connect (struct rdma_cm_id *cmid)
2877 {
2878         kib_peer_t              *peer = (kib_peer_t *)cmid->context;
2879         kib_conn_t              *conn;
2880         kib_msg_t               *msg;
2881         struct rdma_conn_param   cp;
2882         int                      version;
2883         __u64                    incarnation;
2884         unsigned long            flags;
2885         int                      rc;
2886
2887         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2888
2889         incarnation = peer->ibp_incarnation;
2890         version     = (peer->ibp_version == 0) ? IBLND_MSG_VERSION :
2891                                                  peer->ibp_version;
2892
2893         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2894
2895         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT,
2896                                   version);
2897         if (conn == NULL) {
2898                 kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2899                 kiblnd_peer_decref(peer); /* lose cmid's ref */
2900                 return -ENOMEM;
2901         }
2902
2903         /* conn "owns" cmid now, so I return success from here on to ensure the
2904          * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2905          * on peer */
2906
2907         msg = &conn->ibc_connvars->cv_msg;
2908
2909         memset(msg, 0, sizeof(*msg));
2910         kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2911         msg->ibm_u.connparams.ibcp_queue_depth  = conn->ibc_queue_depth;
2912         msg->ibm_u.connparams.ibcp_max_frags    = conn->ibc_max_frags;
2913         msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2914
2915         kiblnd_pack_msg(peer->ibp_ni, msg, version,
2916                         0, peer->ibp_nid, incarnation);
2917
2918         memset(&cp, 0, sizeof(cp));
2919         cp.private_data        = msg;
2920         cp.private_data_len    = msg->ibm_nob;
2921         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2922         cp.initiator_depth     = 0;
2923         cp.flow_control        = 1;
2924         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2925         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2926
2927         LASSERT(cmid->context == (void *)conn);
2928         LASSERT(conn->ibc_cmid == cmid);
2929
2930         rc = rdma_connect(cmid, &cp);
2931         if (rc != 0) {
2932                 CERROR("Can't connect to %s: %d\n",
2933                        libcfs_nid2str(peer->ibp_nid), rc);
2934                 kiblnd_connreq_done(conn, rc);
2935                 kiblnd_conn_decref(conn);
2936         }
2937
2938         return 0;
2939 }
2940
2941 int
2942 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2943 {
2944         kib_peer_t  *peer;
2945         kib_conn_t  *conn;
2946         int          rc;
2947
2948         switch (event->event) {
2949         default:
2950                 CERROR("Unexpected event: %d, status: %d\n",
2951                        event->event, event->status);
2952                 LBUG();
2953
2954         case RDMA_CM_EVENT_CONNECT_REQUEST:
2955                 /* destroy cmid on failure */
2956                 rc = kiblnd_passive_connect(cmid, 
2957                                             (void *)KIBLND_CONN_PARAM(event),
2958                                             KIBLND_CONN_PARAM_LEN(event));
2959                 CDEBUG(D_NET, "connreq: %d\n", rc);
2960                 return rc;
2961                 
2962         case RDMA_CM_EVENT_ADDR_ERROR:
2963                 peer = (kib_peer_t *)cmid->context;
2964                 CNETERR("%s: ADDR ERROR %d\n",
2965                        libcfs_nid2str(peer->ibp_nid), event->status);
2966                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2967                 kiblnd_peer_decref(peer);
2968                 return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2969
2970         case RDMA_CM_EVENT_ADDR_RESOLVED:
2971                 peer = (kib_peer_t *)cmid->context;
2972
2973                 CDEBUG(D_NET,"%s Addr resolved: %d\n",
2974                        libcfs_nid2str(peer->ibp_nid), event->status);
2975
2976                 if (event->status != 0) {
2977                         CNETERR("Can't resolve address for %s: %d\n",
2978                                 libcfs_nid2str(peer->ibp_nid), event->status);
2979                         rc = event->status;
2980                 } else {
2981                         rc = rdma_resolve_route(
2982                                 cmid, *kiblnd_tunables.kib_timeout * 1000);
2983                         if (rc == 0)
2984                                 return 0;
2985                         /* Can't initiate route resolution */
2986                         CERROR("Can't resolve route for %s: %d\n",
2987                                libcfs_nid2str(peer->ibp_nid), rc);
2988                 }
2989                 kiblnd_peer_connect_failed(peer, 1, rc);
2990                 kiblnd_peer_decref(peer);
2991                 return rc;                      /* rc != 0 destroys cmid */
2992
2993         case RDMA_CM_EVENT_ROUTE_ERROR:
2994                 peer = (kib_peer_t *)cmid->context;
2995                 CNETERR("%s: ROUTE ERROR %d\n",
2996                         libcfs_nid2str(peer->ibp_nid), event->status);
2997                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2998                 kiblnd_peer_decref(peer);
2999                 return -EHOSTUNREACH;           /* rc != 0 destroys cmid */
3000
3001         case RDMA_CM_EVENT_ROUTE_RESOLVED:
3002                 peer = (kib_peer_t *)cmid->context;
3003                 CDEBUG(D_NET,"%s Route resolved: %d\n",
3004                        libcfs_nid2str(peer->ibp_nid), event->status);
3005
3006                 if (event->status == 0)
3007                         return kiblnd_active_connect(cmid);
3008
3009                 CNETERR("Can't resolve route for %s: %d\n",
3010                        libcfs_nid2str(peer->ibp_nid), event->status);
3011                 kiblnd_peer_connect_failed(peer, 1, event->status);
3012                 kiblnd_peer_decref(peer);
3013                 return event->status;           /* rc != 0 destroys cmid */
3014                 
3015         case RDMA_CM_EVENT_UNREACHABLE:
3016                 conn = (kib_conn_t *)cmid->context;
3017                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3018                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3019                 CNETERR("%s: UNREACHABLE %d\n",
3020                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3021                 kiblnd_connreq_done(conn, -ENETDOWN);
3022                 kiblnd_conn_decref(conn);
3023                 return 0;
3024
3025         case RDMA_CM_EVENT_CONNECT_ERROR:
3026                 conn = (kib_conn_t *)cmid->context;
3027                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3028                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3029                 CNETERR("%s: CONNECT ERROR %d\n",
3030                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3031                 kiblnd_connreq_done(conn, -ENOTCONN);
3032                 kiblnd_conn_decref(conn);
3033                 return 0;
3034
3035         case RDMA_CM_EVENT_REJECTED:
3036                 conn = (kib_conn_t *)cmid->context;
3037                 switch (conn->ibc_state) {
3038                 default:
3039                         LBUG();
3040
3041                 case IBLND_CONN_PASSIVE_WAIT:
3042                         CERROR ("%s: REJECTED %d\n",
3043                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
3044                                 event->status);
3045                         kiblnd_connreq_done(conn, -ECONNRESET);
3046                         break;
3047
3048                 case IBLND_CONN_ACTIVE_CONNECT:
3049                         kiblnd_rejected(conn, event->status,
3050                                         (void *)KIBLND_CONN_PARAM(event),
3051                                         KIBLND_CONN_PARAM_LEN(event));
3052                         break;
3053                 }
3054                 kiblnd_conn_decref(conn);
3055                 return 0;
3056
3057         case RDMA_CM_EVENT_ESTABLISHED:
3058                 conn = (kib_conn_t *)cmid->context;
3059                 switch (conn->ibc_state) {
3060                 default:
3061                         LBUG();
3062
3063                 case IBLND_CONN_PASSIVE_WAIT:
3064                         CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
3065                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3066                         kiblnd_connreq_done(conn, 0);
3067                         break;
3068
3069                 case IBLND_CONN_ACTIVE_CONNECT:
3070                         CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
3071                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3072                         kiblnd_check_connreply(conn,
3073                                                (void *)KIBLND_CONN_PARAM(event),
3074                                                KIBLND_CONN_PARAM_LEN(event));
3075                         break;
3076                 }
3077                 /* net keeps its ref on conn! */
3078                 return 0;
3079
3080         case RDMA_CM_EVENT_TIMEWAIT_EXIT:
3081                 CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
3082                 return 0;
3083
3084         case RDMA_CM_EVENT_DISCONNECTED:
3085                 conn = (kib_conn_t *)cmid->context;
3086                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
3087                         CERROR("%s DISCONNECTED\n",
3088                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3089                         kiblnd_connreq_done(conn, -ECONNRESET);
3090                 } else {
3091                         kiblnd_close_conn(conn, 0);
3092                 }
3093                 kiblnd_conn_decref(conn);
3094                 cmid->context = NULL;