Whamcloud - gitweb
b95a04ffca988f0fa12edeaa8d5343c5d83e7563
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "o2iblnd.h"
42
43 void
44 kiblnd_tx_done (lnet_ni_t *ni, kib_tx_t *tx)
45 {
46         lnet_msg_t *lntmsg[2];
47         kib_net_t  *net = ni->ni_data;
48         int         rc;
49         int         i;
50
51         LASSERT (net != NULL);
52         LASSERT (!cfs_in_interrupt());
53         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
54         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
55         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
56         LASSERT (tx->tx_pool != NULL);
57
58         kiblnd_unmap_tx(ni, tx);
59
60         /* tx may have up to 2 lnet msgs to finalise */
61         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
62         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
63         rc = tx->tx_status;
64
65         if (tx->tx_conn != NULL) {
66                 LASSERT (ni == tx->tx_conn->ibc_peer->ibp_ni);
67
68                 kiblnd_conn_decref(tx->tx_conn);
69                 tx->tx_conn = NULL;
70         }
71
72         tx->tx_nwrq = 0;
73         tx->tx_status = 0;
74
75         kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
76
77         /* delay finalize until my descs have been freed */
78         for (i = 0; i < 2; i++) {
79                 if (lntmsg[i] == NULL)
80                         continue;
81
82                 lnet_finalize(ni, lntmsg[i], rc);
83         }
84 }
85
86 void
87 kiblnd_txlist_done (lnet_ni_t *ni, cfs_list_t *txlist, int status)
88 {
89         kib_tx_t *tx;
90
91         while (!cfs_list_empty (txlist)) {
92                 tx = cfs_list_entry (txlist->next, kib_tx_t, tx_list);
93
94                 cfs_list_del(&tx->tx_list);
95                 /* complete now */
96                 tx->tx_waiting = 0;
97                 tx->tx_status = status;
98                 kiblnd_tx_done(ni, tx);
99         }
100 }
101
102 kib_tx_t *
103 kiblnd_get_idle_tx (lnet_ni_t *ni)
104 {
105         kib_net_t            *net = (kib_net_t *)ni->ni_data;
106         cfs_list_t           *node;
107         kib_tx_t             *tx;
108
109         node = kiblnd_pool_alloc_node(&net->ibn_tx_ps.tps_poolset);
110         if (node == NULL)
111                 return NULL;
112         tx = container_of(node, kib_tx_t, tx_list);
113
114         LASSERT (tx->tx_nwrq == 0);
115         LASSERT (!tx->tx_queued);
116         LASSERT (tx->tx_sending == 0);
117         LASSERT (!tx->tx_waiting);
118         LASSERT (tx->tx_status == 0);
119         LASSERT (tx->tx_conn == NULL);
120         LASSERT (tx->tx_lntmsg[0] == NULL);
121         LASSERT (tx->tx_lntmsg[1] == NULL);
122         LASSERT (tx->tx_u.pmr == NULL);
123         LASSERT (tx->tx_nfrags == 0);
124
125         return tx;
126 }
127
128 void
129 kiblnd_drop_rx (kib_rx_t *rx)
130 {
131         kib_conn_t         *conn = rx->rx_conn;
132         unsigned long       flags;
133         
134         cfs_spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
135         LASSERT (conn->ibc_nrx > 0);
136         conn->ibc_nrx--;
137         cfs_spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
138
139         kiblnd_conn_decref(conn);
140 }
141
142 int
143 kiblnd_post_rx (kib_rx_t *rx, int credit)
144 {
145         kib_conn_t         *conn = rx->rx_conn;
146         kib_net_t          *net = conn->ibc_peer->ibp_ni->ni_data;
147         struct ib_recv_wr  *bad_wrq = NULL;
148         struct ib_mr       *mr;
149         int                 rc;
150
151         LASSERT (net != NULL);
152         LASSERT (!cfs_in_interrupt());
153         LASSERT (credit == IBLND_POSTRX_NO_CREDIT ||
154                  credit == IBLND_POSTRX_PEER_CREDIT ||
155                  credit == IBLND_POSTRX_RSRVD_CREDIT);
156
157         mr = kiblnd_find_dma_mr(conn->ibc_hdev, rx->rx_msgaddr, IBLND_MSG_SIZE);
158         LASSERT (mr != NULL);
159
160         rx->rx_sge.lkey   = mr->lkey;
161         rx->rx_sge.addr   = rx->rx_msgaddr;
162         rx->rx_sge.length = IBLND_MSG_SIZE;
163
164         rx->rx_wrq.next = NULL;
165         rx->rx_wrq.sg_list = &rx->rx_sge;
166         rx->rx_wrq.num_sge = 1;
167         rx->rx_wrq.wr_id = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
168
169         LASSERT (conn->ibc_state >= IBLND_CONN_INIT);
170         LASSERT (rx->rx_nob >= 0);              /* not posted */
171
172         if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
173                 kiblnd_drop_rx(rx);             /* No more posts for this rx */
174                 return 0;
175         }
176
177         rx->rx_nob = -1;                        /* flag posted */
178
179         rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
180         if (rc != 0) {
181                 CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
182                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
183                 rx->rx_nob = 0;
184         }
185
186         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
187                 return rc;
188
189         if (rc != 0) {
190                 kiblnd_close_conn(conn, rc);
191                 kiblnd_drop_rx(rx);             /* No more posts for this rx */
192                 return rc;
193         }
194
195         if (credit == IBLND_POSTRX_NO_CREDIT)
196                 return 0;
197
198         cfs_spin_lock(&conn->ibc_lock);
199         if (credit == IBLND_POSTRX_PEER_CREDIT)
200                 conn->ibc_outstanding_credits++;
201         else
202                 conn->ibc_reserved_credits++;
203         cfs_spin_unlock(&conn->ibc_lock);
204
205         kiblnd_check_sends(conn);
206         return 0;
207 }
208
209 kib_tx_t *
210 kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
211 {
212         cfs_list_t   *tmp;
213
214         cfs_list_for_each(tmp, &conn->ibc_active_txs) {
215                 kib_tx_t *tx = cfs_list_entry(tmp, kib_tx_t, tx_list);
216
217                 LASSERT (!tx->tx_queued);
218                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
219
220                 if (tx->tx_cookie != cookie)
221                         continue;
222
223                 if (tx->tx_waiting &&
224                     tx->tx_msg->ibm_type == txtype)
225                         return tx;
226
227                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
228                       tx->tx_waiting ? "" : "NOT ",
229                       tx->tx_msg->ibm_type, txtype);
230         }
231         return NULL;
232 }
233
234 void
235 kiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
236 {
237         kib_tx_t    *tx;
238         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
239         int          idle;
240
241         cfs_spin_lock(&conn->ibc_lock);
242
243         tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
244         if (tx == NULL) {
245                 cfs_spin_unlock(&conn->ibc_lock);
246
247                 CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
248                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
249                 kiblnd_close_conn(conn, -EPROTO);
250                 return;
251         }
252
253         if (tx->tx_status == 0) {               /* success so far */
254                 if (status < 0) {               /* failed? */
255                         tx->tx_status = status;
256                 } else if (txtype == IBLND_MSG_GET_REQ) {
257                         lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
258                 }
259         }
260
261         tx->tx_waiting = 0;
262
263         idle = !tx->tx_queued && (tx->tx_sending == 0);
264         if (idle)
265                 cfs_list_del(&tx->tx_list);
266
267         cfs_spin_unlock(&conn->ibc_lock);
268
269         if (idle)
270                 kiblnd_tx_done(ni, tx);
271 }
272
273 void
274 kiblnd_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie)
275 {
276         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
277         kib_tx_t    *tx = kiblnd_get_idle_tx(ni);
278
279         if (tx == NULL) {
280                 CERROR("Can't get tx for completion %x for %s\n",
281                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
282                 return;
283         }
284
285         tx->tx_msg->ibm_u.completion.ibcm_status = status;
286         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
287         kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));
288
289         kiblnd_queue_tx(tx, conn);
290 }
291
292 void
293 kiblnd_handle_rx (kib_rx_t *rx)
294 {
295         kib_msg_t    *msg = rx->rx_msg;
296         kib_conn_t   *conn = rx->rx_conn;
297         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
298         int           credits = msg->ibm_credits;
299         kib_tx_t     *tx;
300         int           rc = 0;
301         int           rc2;
302         int           post_credit;
303
304         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
305
306         CDEBUG (D_NET, "Received %x[%d] from %s\n",
307                 msg->ibm_type, credits,
308                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
309
310         if (credits != 0) {
311                 /* Have I received credits that will let me send? */
312                 cfs_spin_lock(&conn->ibc_lock);
313
314                 if (conn->ibc_credits + credits >
315                     IBLND_MSG_QUEUE_SIZE(conn->ibc_version)) {
316                         rc2 = conn->ibc_credits;
317                         cfs_spin_unlock(&conn->ibc_lock);
318
319                         CERROR("Bad credits from %s: %d + %d > %d\n",
320                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
321                                rc2, credits,
322                                IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
323
324                         kiblnd_close_conn(conn, -EPROTO);
325                         kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
326                         return;
327                 }
328
329                 conn->ibc_credits += credits;
330
331                 /* This ensures the credit taken by NOOP can be returned */
332                 if (msg->ibm_type == IBLND_MSG_NOOP &&
333                     !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
334                         conn->ibc_outstanding_credits++;
335
336                 cfs_spin_unlock(&conn->ibc_lock);
337                 kiblnd_check_sends(conn);
338         }
339
340         switch (msg->ibm_type) {
341         default:
342                 CERROR("Bad IBLND message type %x from %s\n",
343                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
344                 post_credit = IBLND_POSTRX_NO_CREDIT;
345                 rc = -EPROTO;
346                 break;
347
348         case IBLND_MSG_NOOP:
349                 if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
350                         post_credit = IBLND_POSTRX_NO_CREDIT;
351                         break;
352                 }
353
354                 if (credits != 0) /* credit already posted */
355                         post_credit = IBLND_POSTRX_NO_CREDIT;
356                 else              /* a keepalive NOOP */
357                         post_credit = IBLND_POSTRX_PEER_CREDIT;
358                 break;
359
360         case IBLND_MSG_IMMEDIATE:
361                 post_credit = IBLND_POSTRX_DONT_POST;
362                 rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
363                                 msg->ibm_srcnid, rx, 0);
364                 if (rc < 0)                     /* repost on error */
365                         post_credit = IBLND_POSTRX_PEER_CREDIT;
366                 break;
367
368         case IBLND_MSG_PUT_REQ:
369                 post_credit = IBLND_POSTRX_DONT_POST;
370                 rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
371                                 msg->ibm_srcnid, rx, 1);
372                 if (rc < 0)                     /* repost on error */
373                         post_credit = IBLND_POSTRX_PEER_CREDIT;
374                 break;
375
376         case IBLND_MSG_PUT_NAK:
377                 CWARN ("PUT_NACK from %s\n",
378                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
379                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
380                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
381                                          msg->ibm_u.completion.ibcm_status,
382                                          msg->ibm_u.completion.ibcm_cookie);
383                 break;
384
385         case IBLND_MSG_PUT_ACK:
386                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
387
388                 cfs_spin_lock(&conn->ibc_lock);
389                 tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
390                                                    msg->ibm_u.putack.ibpam_src_cookie);
391                 if (tx != NULL)
392                         cfs_list_del(&tx->tx_list);
393                 cfs_spin_unlock(&conn->ibc_lock);
394
395                 if (tx == NULL) {
396                         CERROR("Unmatched PUT_ACK from %s\n",
397                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
398                         rc = -EPROTO;
399                         break;
400                 }
401
402                 LASSERT (tx->tx_waiting);
403                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
404                  * (a) I can overwrite tx_msg since my peer has received it!
405                  * (b) tx_waiting set tells tx_complete() it's not done. */
406
407                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
408
409                 rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
410                                        kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
411                                        &msg->ibm_u.putack.ibpam_rd,
412                                        msg->ibm_u.putack.ibpam_dst_cookie);
413                 if (rc2 < 0)
414                         CERROR("Can't setup rdma for PUT to %s: %d\n",
415                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
416
417                 cfs_spin_lock(&conn->ibc_lock);
418                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
419                 kiblnd_queue_tx_locked(tx, conn);
420                 cfs_spin_unlock(&conn->ibc_lock);
421                 break;
422
423         case IBLND_MSG_PUT_DONE:
424                 post_credit = IBLND_POSTRX_PEER_CREDIT;
425                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
426                                          msg->ibm_u.completion.ibcm_status,
427                                          msg->ibm_u.completion.ibcm_cookie);
428                 break;
429
430         case IBLND_MSG_GET_REQ:
431                 post_credit = IBLND_POSTRX_DONT_POST;
432                 rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
433                                 msg->ibm_srcnid, rx, 1);
434                 if (rc < 0)                     /* repost on error */
435                         post_credit = IBLND_POSTRX_PEER_CREDIT;
436                 break;
437
438         case IBLND_MSG_GET_DONE:
439                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
440                 kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
441                                          msg->ibm_u.completion.ibcm_status,
442                                          msg->ibm_u.completion.ibcm_cookie);
443                 break;
444         }
445
446         if (rc < 0)                             /* protocol error */
447                 kiblnd_close_conn(conn, rc);
448
449         if (post_credit != IBLND_POSTRX_DONT_POST)
450                 kiblnd_post_rx(rx, post_credit);
451 }
452
453 void
454 kiblnd_rx_complete (kib_rx_t *rx, int status, int nob)
455 {
456         kib_msg_t    *msg = rx->rx_msg;
457         kib_conn_t   *conn = rx->rx_conn;
458         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
459         kib_net_t    *net = ni->ni_data;
460         int           rc;
461         int           err = -EIO;
462
463         LASSERT (net != NULL);
464         LASSERT (rx->rx_nob < 0);               /* was posted */
465         rx->rx_nob = 0;                         /* isn't now */
466
467         if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
468                 goto ignore;
469
470         if (status != IB_WC_SUCCESS) {
471                 CNETERR("Rx from %s failed: %d\n",
472                         libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
473                 goto failed;
474         }
475
476         LASSERT (nob >= 0);
477         rx->rx_nob = nob;
478
479         rc = kiblnd_unpack_msg(msg, rx->rx_nob);
480         if (rc != 0) {
481                 CERROR ("Error %d unpacking rx from %s\n",
482                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
483                 goto failed;
484         }
485
486         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
487             msg->ibm_dstnid != ni->ni_nid ||
488             msg->ibm_srcstamp != conn->ibc_incarnation ||
489             msg->ibm_dststamp != net->ibn_incarnation) {
490                 CERROR ("Stale rx from %s\n",
491                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
492                 err = -ESTALE;
493                 goto failed;
494         }
495
496         /* set time last known alive */
497         kiblnd_peer_alive(conn->ibc_peer);
498
499         /* racing with connection establishment/teardown! */
500
501         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
502                 cfs_rwlock_t  *g_lock = &kiblnd_data.kib_global_lock;
503                 unsigned long  flags;
504
505                 cfs_write_lock_irqsave(g_lock, flags);
506                 /* must check holding global lock to eliminate race */
507                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
508                         cfs_list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
509                         cfs_write_unlock_irqrestore(g_lock, flags);
510                         return;
511                 }
512                 cfs_write_unlock_irqrestore(g_lock, flags);
513         }
514         kiblnd_handle_rx(rx);
515         return;
516
517  failed:
518         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
519         kiblnd_close_conn(conn, err);
520  ignore:
521         kiblnd_drop_rx(rx);                     /* Don't re-post rx. */
522 }
523
524 struct page *
525 kiblnd_kvaddr_to_page (unsigned long vaddr)
526 {
527         struct page *page;
528
529         if (vaddr >= VMALLOC_START &&
530             vaddr < VMALLOC_END) {
531                 page = vmalloc_to_page ((void *)vaddr);
532                 LASSERT (page != NULL);
533                 return page;
534         }
535 #ifdef CONFIG_HIGHMEM
536         if (vaddr >= PKMAP_BASE &&
537             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
538                 /* No highmem pages only used for bulk (kiov) I/O */
539                 CERROR("find page for address in highmem\n");
540                 LBUG();
541         }
542 #endif
543         page = virt_to_page (vaddr);
544         LASSERT (page != NULL);
545         return page;
546 }
547
548 static int
549 kiblnd_fmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, int nob)
550 {
551         kib_hca_dev_t      *hdev  = tx->tx_pool->tpo_hdev;
552         __u64              *pages = tx->tx_pages;
553         int                 npages;
554         int                 size;
555         int                 rc;
556         int                 i;
557
558         for (i = 0, npages = 0; i < rd->rd_nfrags; i++) {
559                 for (size = 0; size <  rd->rd_frags[i].rf_nob;
560                                size += hdev->ibh_page_size) {
561                         pages[npages ++] = (rd->rd_frags[i].rf_addr &
562                                             hdev->ibh_page_mask) + size;
563                 }
564         }
565
566         rc = kiblnd_fmr_pool_map(&net->ibn_fmr_ps, pages, npages, 0, &tx->tx_u.fmr);
567         if (rc != 0) {
568                 CERROR ("Can't map %d pages: %d\n", npages, rc);
569                 return rc;
570         }
571
572         /* If rd is not tx_rd, it's going to get sent to a peer, who will need
573          * the rkey */
574         rd->rd_key = (rd != tx->tx_rd) ? tx->tx_u.fmr.fmr_pfmr->fmr->rkey :
575                                          tx->tx_u.fmr.fmr_pfmr->fmr->lkey;
576         rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
577         rd->rd_frags[0].rf_nob   = nob;
578         rd->rd_nfrags = 1;
579
580         return 0;
581 }
582
583 static int
584 kiblnd_pmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, int nob)
585 {
586         kib_hca_dev_t *hdev = tx->tx_pool->tpo_hdev;
587         __u64   iova;
588         int     rc;
589
590         iova = rd->rd_frags[0].rf_addr & ~hdev->ibh_page_mask;
591
592         rc = kiblnd_pmr_pool_map(&net->ibn_pmr_ps, hdev, rd, &iova, &tx->tx_u.pmr);
593         if (rc != 0) {
594                 CERROR("Failed to create MR by phybuf: %d\n", rc);
595                 return rc;
596         }
597
598         /* If rd is not tx_rd, it's going to get sent to a peer, who will need
599          * the rkey */
600         rd->rd_key = (rd != tx->tx_rd) ? tx->tx_u.pmr->pmr_mr->rkey :
601                                          tx->tx_u.pmr->pmr_mr->lkey;
602         rd->rd_nfrags = 1;
603         rd->rd_frags[0].rf_addr = iova;
604         rd->rd_frags[0].rf_nob  = nob;
605
606         return 0;
607 }
608
609 void
610 kiblnd_unmap_tx(lnet_ni_t *ni, kib_tx_t *tx)
611 {
612         kib_net_t  *net = ni->ni_data;
613
614         LASSERT (net != NULL);
615
616         if (net->ibn_with_fmr && tx->tx_u.fmr.fmr_pfmr != NULL) {
617                 kiblnd_fmr_pool_unmap(&tx->tx_u.fmr, tx->tx_status);
618                 tx->tx_u.fmr.fmr_pfmr = NULL;
619         } else if (net->ibn_with_pmr && tx->tx_u.pmr != NULL) {
620                 kiblnd_pmr_pool_unmap(tx->tx_u.pmr);
621                 tx->tx_u.pmr = NULL;
622         }
623
624         if (tx->tx_nfrags != 0) {
625                 kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
626                                     tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
627                 tx->tx_nfrags = 0;
628         }
629 }
630
631 int
632 kiblnd_map_tx(lnet_ni_t *ni, kib_tx_t *tx,
633               kib_rdma_desc_t *rd, int nfrags)
634 {
635         kib_hca_dev_t      *hdev  = tx->tx_pool->tpo_hdev;
636         kib_net_t          *net   = ni->ni_data;
637         struct ib_mr       *mr    = NULL;
638         __u32               nob;
639         int                 i;
640
641         /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
642          * RDMA sink */
643         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
644         tx->tx_nfrags = nfrags;
645
646         rd->rd_nfrags =
647                 kiblnd_dma_map_sg(hdev->ibh_ibdev,
648                                   tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
649
650         for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
651                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
652                         hdev->ibh_ibdev, &tx->tx_frags[i]);
653                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
654                         hdev->ibh_ibdev, &tx->tx_frags[i]);
655                 nob += rd->rd_frags[i].rf_nob;
656         }
657
658         /* looking for pre-mapping MR */
659         mr = kiblnd_find_rd_dma_mr(hdev, rd);
660         if (mr != NULL) {
661                 /* found pre-mapping MR */
662                 rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
663                 return 0;
664         }
665
666         if (net->ibn_with_fmr)
667                 return kiblnd_fmr_map_tx(net, tx, rd, nob);
668         else if (net->ibn_with_pmr)
669                 return kiblnd_pmr_map_tx(net, tx, rd, nob);
670
671         return -EINVAL;
672 }
673
674
675 int
676 kiblnd_setup_rd_iov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
677                     unsigned int niov, struct iovec *iov, int offset, int nob)
678 {
679         kib_net_t          *net = ni->ni_data;
680         struct page        *page;
681         struct scatterlist *sg;
682         unsigned long       vaddr;
683         int                 fragnob;
684         int                 page_offset;
685
686         LASSERT (nob > 0);
687         LASSERT (niov > 0);
688         LASSERT (net != NULL);
689
690         while (offset >= iov->iov_len) {
691                 offset -= iov->iov_len;
692                 niov--;
693                 iov++;
694                 LASSERT (niov > 0);
695         }
696
697         sg = tx->tx_frags;
698         do {
699                 LASSERT (niov > 0);
700
701                 vaddr = ((unsigned long)iov->iov_base) + offset;
702                 page_offset = vaddr & (PAGE_SIZE - 1);
703                 page = kiblnd_kvaddr_to_page(vaddr);
704                 if (page == NULL) {
705                         CERROR ("Can't find page\n");
706                         return -EFAULT;
707                 }
708
709                 fragnob = min((int)(iov->iov_len - offset), nob);
710                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
711
712                 sg_set_page(sg, page, fragnob, page_offset);
713                 sg++;
714
715                 if (offset + fragnob < iov->iov_len) {
716                         offset += fragnob;
717                 } else {
718                         offset = 0;
719                         iov++;
720                         niov--;
721                 }
722                 nob -= fragnob;
723         } while (nob > 0);
724
725         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
726 }
727
728 int
729 kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
730                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
731 {
732         kib_net_t          *net = ni->ni_data;
733         struct scatterlist *sg;
734         int                 fragnob;
735
736         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
737
738         LASSERT (nob > 0);
739         LASSERT (nkiov > 0);
740         LASSERT (net != NULL);
741
742         while (offset >= kiov->kiov_len) {
743                 offset -= kiov->kiov_len;
744                 nkiov--;
745                 kiov++;
746                 LASSERT (nkiov > 0);
747         }
748
749         sg = tx->tx_frags;
750         do {
751                 LASSERT (nkiov > 0);
752
753                 fragnob = min((int)(kiov->kiov_len - offset), nob);
754
755                 memset(sg, 0, sizeof(*sg));
756                 sg_set_page(sg, kiov->kiov_page, fragnob,
757                             kiov->kiov_offset + offset);
758                 sg++;
759
760                 offset = 0;
761                 kiov++;
762                 nkiov--;
763                 nob -= fragnob;
764         } while (nob > 0);
765
766         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
767 }
768
769 int
770 kiblnd_post_tx_locked (kib_conn_t *conn, kib_tx_t *tx, int credit)
771 {
772         kib_msg_t         *msg = tx->tx_msg;
773         kib_peer_t        *peer = conn->ibc_peer;
774         int                ver = conn->ibc_version;
775         int                rc;
776         int                done;
777         struct ib_send_wr *bad_wrq;
778
779         LASSERT (tx->tx_queued);
780         /* We rely on this for QP sizing */
781         LASSERT (tx->tx_nwrq > 0);
782         LASSERT (tx->tx_nwrq <= 1 + IBLND_RDMA_FRAGS(ver));
783
784         LASSERT (credit == 0 || credit == 1);
785         LASSERT (conn->ibc_outstanding_credits >= 0);
786         LASSERT (conn->ibc_outstanding_credits <= IBLND_MSG_QUEUE_SIZE(ver));
787         LASSERT (conn->ibc_credits >= 0);
788         LASSERT (conn->ibc_credits <= IBLND_MSG_QUEUE_SIZE(ver));
789
790         if (conn->ibc_nsends_posted == IBLND_CONCURRENT_SENDS(ver)) {
791                 /* tx completions outstanding... */
792                 CDEBUG(D_NET, "%s: posted enough\n",
793                        libcfs_nid2str(peer->ibp_nid));
794                 return -EAGAIN;
795         }
796
797         if (credit != 0 && conn->ibc_credits == 0) {   /* no credits */
798                 CDEBUG(D_NET, "%s: no credits\n",
799                        libcfs_nid2str(peer->ibp_nid));
800                 return -EAGAIN;
801         }
802
803         if (credit != 0 && !IBLND_OOB_CAPABLE(ver) &&
804             conn->ibc_credits == 1 &&   /* last credit reserved */
805             msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
806                 CDEBUG(D_NET, "%s: not using last credit\n",
807                        libcfs_nid2str(peer->ibp_nid));
808                 return -EAGAIN;
809         }
810
811         /* NB don't drop ibc_lock before bumping tx_sending */
812         cfs_list_del(&tx->tx_list);
813         tx->tx_queued = 0;
814
815         if (msg->ibm_type == IBLND_MSG_NOOP &&
816             (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
817              (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
818               conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
819                 /* OK to drop when posted enough NOOPs, since
820                  * kiblnd_check_sends will queue NOOP again when
821                  * posted NOOPs complete */
822                 cfs_spin_unlock(&conn->ibc_lock);
823                 kiblnd_tx_done(peer->ibp_ni, tx);
824                 cfs_spin_lock(&conn->ibc_lock);
825                 CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
826                        libcfs_nid2str(peer->ibp_nid),
827                        conn->ibc_noops_posted);
828                 return 0;
829         }
830
831         kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
832                         peer->ibp_nid, conn->ibc_incarnation);
833
834         conn->ibc_credits -= credit;
835         conn->ibc_outstanding_credits = 0;
836         conn->ibc_nsends_posted++;
837         if (msg->ibm_type == IBLND_MSG_NOOP)
838                 conn->ibc_noops_posted++;
839
840         /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
841          * PUT.  If so, it was first queued here as a PUT_REQ, sent and
842          * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
843          * and then re-queued here.  It's (just) possible that
844          * tx_sending is non-zero if we've not done the tx_complete()
845          * from the first send; hence the ++ rather than = below. */
846         tx->tx_sending++;
847         cfs_list_add(&tx->tx_list, &conn->ibc_active_txs);
848
849         /* I'm still holding ibc_lock! */
850         if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
851                 rc = -ECONNABORTED;
852         } else if (tx->tx_pool->tpo_pool.po_failed ||
853                  conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
854                 /* close_conn will launch failover */
855                 rc = -ENETDOWN;
856         } else {
857                 rc = ib_post_send(conn->ibc_cmid->qp,
858                                   tx->tx_wrq, &bad_wrq);
859         }
860
861         conn->ibc_last_send = jiffies;
862
863         if (rc == 0)
864                 return 0;
865
866         /* NB credits are transferred in the actual
867          * message, which can only be the last work item */
868         conn->ibc_credits += credit;
869         conn->ibc_outstanding_credits += msg->ibm_credits;
870         conn->ibc_nsends_posted--;
871         if (msg->ibm_type == IBLND_MSG_NOOP)
872                 conn->ibc_noops_posted--;
873
874         tx->tx_status = rc;
875         tx->tx_waiting = 0;
876         tx->tx_sending--;
877
878         done = (tx->tx_sending == 0);
879         if (done)
880                 cfs_list_del(&tx->tx_list);
881
882         cfs_spin_unlock(&conn->ibc_lock);
883
884         if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
885                 CERROR("Error %d posting transmit to %s\n",
886                        rc, libcfs_nid2str(peer->ibp_nid));
887         else
888                 CDEBUG(D_NET, "Error %d posting transmit to %s\n",
889                        rc, libcfs_nid2str(peer->ibp_nid));
890
891         kiblnd_close_conn(conn, rc);
892
893         if (done)
894                 kiblnd_tx_done(peer->ibp_ni, tx);
895
896         cfs_spin_lock(&conn->ibc_lock);
897
898         return -EIO;
899 }
900
901 void
902 kiblnd_check_sends (kib_conn_t *conn)
903 {
904         int        ver = conn->ibc_version;
905         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
906         kib_tx_t  *tx;
907
908         /* Don't send anything until after the connection is established */
909         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
910                 CDEBUG(D_NET, "%s too soon\n",
911                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
912                 return;
913         }
914
915         cfs_spin_lock(&conn->ibc_lock);
916
917         LASSERT (conn->ibc_nsends_posted <= IBLND_CONCURRENT_SENDS(ver));
918         LASSERT (!IBLND_OOB_CAPABLE(ver) ||
919                  conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
920         LASSERT (conn->ibc_reserved_credits >= 0);
921
922         while (conn->ibc_reserved_credits > 0 &&
923                !cfs_list_empty(&conn->ibc_tx_queue_rsrvd)) {
924                 tx = cfs_list_entry(conn->ibc_tx_queue_rsrvd.next,
925                                     kib_tx_t, tx_list);
926                 cfs_list_del(&tx->tx_list);
927                 cfs_list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
928                 conn->ibc_reserved_credits--;
929         }
930
931         if (kiblnd_need_noop(conn)) {
932                 cfs_spin_unlock(&conn->ibc_lock);
933
934                 tx = kiblnd_get_idle_tx(ni);
935                 if (tx != NULL)
936                         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
937
938                 cfs_spin_lock(&conn->ibc_lock);
939                 if (tx != NULL)
940                         kiblnd_queue_tx_locked(tx, conn);
941         }
942
943         kiblnd_conn_addref(conn); /* 1 ref for me.... (see b21911) */
944
945         for (;;) {
946                 int credit;
947
948                 if (!cfs_list_empty(&conn->ibc_tx_queue_nocred)) {
949                         credit = 0;
950                         tx = cfs_list_entry(conn->ibc_tx_queue_nocred.next,
951                                             kib_tx_t, tx_list);
952                 } else if (!cfs_list_empty(&conn->ibc_tx_noops)) {
953                         LASSERT (!IBLND_OOB_CAPABLE(ver));
954                         credit = 1;
955                         tx = cfs_list_entry(conn->ibc_tx_noops.next,
956                                         kib_tx_t, tx_list);
957                 } else if (!cfs_list_empty(&conn->ibc_tx_queue)) {
958                         credit = 1;
959                         tx = cfs_list_entry(conn->ibc_tx_queue.next,
960                                             kib_tx_t, tx_list);
961                 } else
962                         break;
963
964                 if (kiblnd_post_tx_locked(conn, tx, credit) != 0)
965                         break;
966         }
967
968         cfs_spin_unlock(&conn->ibc_lock);
969
970         kiblnd_conn_decref(conn); /* ...until here */
971 }
972
973 void
974 kiblnd_tx_complete (kib_tx_t *tx, int status)
975 {
976         int           failed = (status != IB_WC_SUCCESS);
977         kib_conn_t   *conn = tx->tx_conn;
978         int           idle;
979
980         LASSERT (tx->tx_sending > 0);
981
982         if (failed) {
983                 if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
984                         CNETERR("Tx -> %s cookie "LPX64
985                                 " sending %d waiting %d: failed %d\n",
986                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
987                                 tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
988                                 status);
989
990                 kiblnd_close_conn(conn, -EIO);
991         } else {
992                 kiblnd_peer_alive(conn->ibc_peer);
993         }
994
995         cfs_spin_lock(&conn->ibc_lock);
996
997         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
998          * gets to free it, which also drops its ref on 'conn'. */
999
1000         tx->tx_sending--;
1001         conn->ibc_nsends_posted--;
1002         if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1003                 conn->ibc_noops_posted--;
1004
1005         if (failed) {
1006                 tx->tx_waiting = 0;             /* don't wait for peer */
1007                 tx->tx_status = -EIO;
1008         }
1009
1010         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1011                !tx->tx_waiting &&               /* Not waiting for peer */
1012                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1013         if (idle)
1014                 cfs_list_del(&tx->tx_list);
1015
1016         kiblnd_conn_addref(conn);               /* 1 ref for me.... */
1017
1018         cfs_spin_unlock(&conn->ibc_lock);
1019
1020         if (idle)
1021                 kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1022
1023         kiblnd_check_sends(conn);
1024
1025         kiblnd_conn_decref(conn);               /* ...until here */
1026 }
1027
1028 void
1029 kiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob)
1030 {
1031         kib_hca_dev_t     *hdev = tx->tx_pool->tpo_hdev;
1032         struct ib_sge     *sge = &tx->tx_sge[tx->tx_nwrq];
1033         struct ib_send_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1034         int                nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1035         struct ib_mr      *mr;
1036
1037         LASSERT (tx->tx_nwrq >= 0);
1038         LASSERT (tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1039         LASSERT (nob <= IBLND_MSG_SIZE);
1040
1041         kiblnd_init_msg(tx->tx_msg, type, body_nob);
1042
1043         mr = kiblnd_find_dma_mr(hdev, tx->tx_msgaddr, nob);
1044         LASSERT (mr != NULL);
1045
1046         sge->lkey   = mr->lkey;
1047         sge->addr   = tx->tx_msgaddr;
1048         sge->length = nob;
1049
1050         memset(wrq, 0, sizeof(*wrq));
1051
1052         wrq->next       = NULL;
1053         wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1054         wrq->sg_list    = sge;
1055         wrq->num_sge    = 1;
1056         wrq->opcode     = IB_WR_SEND;
1057         wrq->send_flags = IB_SEND_SIGNALED;
1058
1059         tx->tx_nwrq++;
1060 }
1061
1062 int
1063 kiblnd_init_rdma (kib_conn_t *conn, kib_tx_t *tx, int type,
1064                   int resid, kib_rdma_desc_t *dstrd, __u64 dstcookie)
1065 {
1066         kib_msg_t         *ibmsg = tx->tx_msg;
1067         kib_rdma_desc_t   *srcrd = tx->tx_rd;
1068         struct ib_sge     *sge = &tx->tx_sge[0];
1069         struct ib_send_wr *wrq = &tx->tx_wrq[0];
1070         int                rc  = resid;
1071         int                srcidx;
1072         int                dstidx;
1073         int                wrknob;
1074
1075         LASSERT (!cfs_in_interrupt());
1076         LASSERT (tx->tx_nwrq == 0);
1077         LASSERT (type == IBLND_MSG_GET_DONE ||
1078                  type == IBLND_MSG_PUT_DONE);
1079
1080         srcidx = dstidx = 0;
1081
1082         while (resid > 0) {
1083                 if (srcidx >= srcrd->rd_nfrags) {
1084                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1085                         rc = -EPROTO;
1086                         break;
1087                 }
1088
1089                 if (dstidx == dstrd->rd_nfrags) {
1090                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1091                         rc = -EPROTO;
1092                         break;
1093                 }
1094
1095                 if (tx->tx_nwrq == IBLND_RDMA_FRAGS(conn->ibc_version)) {
1096                         CERROR("RDMA too fragmented for %s (%d): "
1097                                "%d/%d src %d/%d dst frags\n",
1098                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1099                                IBLND_RDMA_FRAGS(conn->ibc_version),
1100                                srcidx, srcrd->rd_nfrags,
1101                                dstidx, dstrd->rd_nfrags);
1102                         rc = -EMSGSIZE;
1103                         break;
1104                 }
1105
1106                 wrknob = MIN(MIN(kiblnd_rd_frag_size(srcrd, srcidx),
1107                                  kiblnd_rd_frag_size(dstrd, dstidx)), resid);
1108
1109                 sge = &tx->tx_sge[tx->tx_nwrq];
1110                 sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1111                 sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1112                 sge->length = wrknob;
1113
1114                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1115
1116                 wrq->next       = wrq + 1;
1117                 wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1118                 wrq->sg_list    = sge;
1119                 wrq->num_sge    = 1;
1120                 wrq->opcode     = IB_WR_RDMA_WRITE;
1121                 wrq->send_flags = 0;
1122
1123                 wrq->wr.rdma.remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1124                 wrq->wr.rdma.rkey        = kiblnd_rd_frag_key(dstrd, dstidx);
1125
1126                 srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1127                 dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1128
1129                 resid -= wrknob;
1130
1131                 tx->tx_nwrq++;
1132                 wrq++;
1133                 sge++;
1134         }
1135
1136         if (rc < 0)                             /* no RDMA if completing with failure */
1137                 tx->tx_nwrq = 0;
1138
1139         ibmsg->ibm_u.completion.ibcm_status = rc;
1140         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1141         kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1142                            type, sizeof (kib_completion_msg_t));
1143
1144         return rc;
1145 }
1146
1147 void
1148 kiblnd_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
1149 {
1150         cfs_list_t   *q;
1151
1152         LASSERT (tx->tx_nwrq > 0);              /* work items set up */
1153         LASSERT (!tx->tx_queued);               /* not queued for sending already */
1154         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1155
1156         tx->tx_queued = 1;
1157         tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * CFS_HZ);
1158
1159         if (tx->tx_conn == NULL) {
1160                 kiblnd_conn_addref(conn);
1161                 tx->tx_conn = conn;
1162                 LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1163         } else {
1164                 /* PUT_DONE first attached to conn as a PUT_REQ */
1165                 LASSERT (tx->tx_conn == conn);
1166                 LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1167         }
1168
1169         switch (tx->tx_msg->ibm_type) {
1170         default:
1171                 LBUG();
1172
1173         case IBLND_MSG_PUT_REQ:
1174         case IBLND_MSG_GET_REQ:
1175                 q = &conn->ibc_tx_queue_rsrvd;
1176                 break;
1177
1178         case IBLND_MSG_PUT_NAK:
1179         case IBLND_MSG_PUT_ACK:
1180         case IBLND_MSG_PUT_DONE:
1181         case IBLND_MSG_GET_DONE:
1182                 q = &conn->ibc_tx_queue_nocred;
1183                 break;
1184
1185         case IBLND_MSG_NOOP:
1186                 if (IBLND_OOB_CAPABLE(conn->ibc_version))
1187                         q = &conn->ibc_tx_queue_nocred;
1188                 else
1189                         q = &conn->ibc_tx_noops;
1190                 break;
1191
1192         case IBLND_MSG_IMMEDIATE:
1193                 q = &conn->ibc_tx_queue;
1194                 break;
1195         }
1196
1197         cfs_list_add_tail(&tx->tx_list, q);
1198 }
1199
1200 void
1201 kiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1202 {
1203         cfs_spin_lock(&conn->ibc_lock);
1204         kiblnd_queue_tx_locked(tx, conn);
1205         cfs_spin_unlock(&conn->ibc_lock);
1206
1207         kiblnd_check_sends(conn);
1208 }
1209
1210 static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1211                                struct sockaddr_in *srcaddr,
1212                                struct sockaddr_in *dstaddr,
1213                                int timeout_ms)
1214 {
1215         unsigned short port;
1216         int rc;
1217
1218 #ifdef HAVE_OFED_RDMA_SET_REUSEADDR
1219         /* allow the port to be reused */
1220         rc = rdma_set_reuseaddr(cmid, 1);
1221         if (rc != 0) {
1222                 CERROR("Unable to set reuse on cmid: %d\n", rc);
1223                 return rc;
1224         }
1225 #endif
1226
1227         /* look for a free privileged port */
1228         for (port = PROT_SOCK-1; port > 0; port--) {
1229                 srcaddr->sin_port = htons(port);
1230                 rc = rdma_resolve_addr(cmid,
1231                                        (struct sockaddr *)srcaddr,
1232                                        (struct sockaddr *)dstaddr,
1233                                        timeout_ms);
1234                 if (rc == 0) {
1235                         CDEBUG(D_NET, "bound to port %hu\n", port);
1236                         return 0;
1237                 } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1238                         CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1239                                port, rc);
1240                 } else {
1241                         return rc;
1242                 }
1243         }
1244
1245         CERROR("Failed to bind to a free privileged port\n");
1246 #ifndef HAVE_OFED_RDMA_SET_REUSEADDR
1247         CERROR("You may need IB verbs that supports rdma_set_reuseaddr()\n");
1248 #endif
1249         return rc;
1250 }
1251
1252 void
1253 kiblnd_connect_peer (kib_peer_t *peer)
1254 {
1255         struct rdma_cm_id *cmid;
1256         kib_dev_t         *dev;
1257         kib_net_t         *net = peer->ibp_ni->ni_data;
1258         struct sockaddr_in srcaddr;
1259         struct sockaddr_in dstaddr;
1260         int                rc;
1261
1262         LASSERT (net != NULL);
1263         LASSERT (peer->ibp_connecting > 0);
1264
1265         cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1266                                      IB_QPT_RC);
1267
1268         if (IS_ERR(cmid)) {
1269                 CERROR("Can't create CMID for %s: %ld\n",
1270                        libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1271                 rc = PTR_ERR(cmid);
1272                 goto failed;
1273         }
1274
1275         dev = net->ibn_dev;
1276         memset(&srcaddr, 0, sizeof(srcaddr));
1277         srcaddr.sin_family = AF_INET;
1278         srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1279
1280         memset(&dstaddr, 0, sizeof(dstaddr));
1281         dstaddr.sin_family = AF_INET;
1282         dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1283         dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1284
1285         kiblnd_peer_addref(peer);               /* cmid's ref */
1286
1287         if (*kiblnd_tunables.kib_use_priv_port) {
1288                 rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1289                                          *kiblnd_tunables.kib_timeout * 1000);
1290         } else {
1291                 rc = rdma_resolve_addr(cmid,
1292                                        (struct sockaddr *)&srcaddr,
1293                                        (struct sockaddr *)&dstaddr,
1294                                        *kiblnd_tunables.kib_timeout * 1000);
1295         }
1296         if (rc != 0) {
1297                 /* Can't initiate address resolution:  */
1298                 CERROR("Can't resolve addr for %s: %d\n",
1299                        libcfs_nid2str(peer->ibp_nid), rc);
1300                 goto failed2;
1301         }
1302
1303         LASSERT (cmid->device != NULL);
1304         CDEBUG(D_NET, "%s: connection bound to %s:%u.%u.%u.%u:%s\n",
1305                libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1306                HIPQUAD(dev->ibd_ifip), cmid->device->name);
1307
1308         return;
1309
1310  failed2:
1311         kiblnd_peer_decref(peer);               /* cmid's ref */
1312         rdma_destroy_id(cmid);
1313  failed:
1314         kiblnd_peer_connect_failed(peer, 1, rc);
1315 }
1316
1317 void
1318 kiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid)
1319 {
1320         kib_peer_t        *peer;
1321         kib_peer_t        *peer2;
1322         kib_conn_t        *conn;
1323         cfs_rwlock_t      *g_lock = &kiblnd_data.kib_global_lock;
1324         unsigned long      flags;
1325         int                rc;
1326
1327         /* If I get here, I've committed to send, so I complete the tx with
1328          * failure on any problems */
1329
1330         LASSERT (tx == NULL || tx->tx_conn == NULL); /* only set when assigned a conn */
1331         LASSERT (tx == NULL || tx->tx_nwrq > 0);     /* work items have been set up */
1332
1333         /* First time, just use a read lock since I expect to find my peer
1334          * connected */
1335         cfs_read_lock_irqsave(g_lock, flags);
1336
1337         peer = kiblnd_find_peer_locked(nid);
1338         if (peer != NULL && !cfs_list_empty(&peer->ibp_conns)) {
1339                 /* Found a peer with an established connection */
1340                 conn = kiblnd_get_conn_locked(peer);
1341                 kiblnd_conn_addref(conn); /* 1 ref for me... */
1342
1343                 cfs_read_unlock_irqrestore(g_lock, flags);
1344
1345                 if (tx != NULL)
1346                         kiblnd_queue_tx(tx, conn);
1347                 kiblnd_conn_decref(conn); /* ...to here */
1348                 return;
1349         }
1350
1351         cfs_read_unlock(g_lock);
1352         /* Re-try with a write lock */
1353         cfs_write_lock(g_lock);
1354
1355         peer = kiblnd_find_peer_locked(nid);
1356         if (peer != NULL) {
1357                 if (cfs_list_empty(&peer->ibp_conns)) {
1358                         /* found a peer, but it's still connecting... */
1359                         LASSERT (peer->ibp_connecting != 0 ||
1360                                  peer->ibp_accepting != 0);
1361                         if (tx != NULL)
1362                                 cfs_list_add_tail(&tx->tx_list,
1363                                                   &peer->ibp_tx_queue);
1364                         cfs_write_unlock_irqrestore(g_lock, flags);
1365                 } else {
1366                         conn = kiblnd_get_conn_locked(peer);
1367                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1368
1369                         cfs_write_unlock_irqrestore(g_lock, flags);
1370
1371                         if (tx != NULL)
1372                                 kiblnd_queue_tx(tx, conn);
1373                         kiblnd_conn_decref(conn); /* ...to here */
1374                 }
1375                 return;
1376         }
1377
1378         cfs_write_unlock_irqrestore(g_lock, flags);
1379
1380         /* Allocate a peer ready to add to the peer table and retry */
1381         rc = kiblnd_create_peer(ni, &peer, nid);
1382         if (rc != 0) {
1383                 CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1384                 if (tx != NULL) {
1385                         tx->tx_status = -EHOSTUNREACH;
1386                         tx->tx_waiting = 0;
1387                         kiblnd_tx_done(ni, tx);
1388                 }
1389                 return;
1390         }
1391
1392         cfs_write_lock_irqsave(g_lock, flags);
1393
1394         peer2 = kiblnd_find_peer_locked(nid);
1395         if (peer2 != NULL) {
1396                 if (cfs_list_empty(&peer2->ibp_conns)) {
1397                         /* found a peer, but it's still connecting... */
1398                         LASSERT (peer2->ibp_connecting != 0 ||
1399                                  peer2->ibp_accepting != 0);
1400                         if (tx != NULL)
1401                                 cfs_list_add_tail(&tx->tx_list,
1402                                                   &peer2->ibp_tx_queue);
1403                         cfs_write_unlock_irqrestore(g_lock, flags);
1404                 } else {
1405                         conn = kiblnd_get_conn_locked(peer2);
1406                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1407
1408                         cfs_write_unlock_irqrestore(g_lock, flags);
1409
1410                         if (tx != NULL)
1411                                 kiblnd_queue_tx(tx, conn);
1412                         kiblnd_conn_decref(conn); /* ...to here */
1413                 }
1414
1415                 kiblnd_peer_decref(peer);
1416                 return;
1417         }
1418
1419         /* Brand new peer */
1420         LASSERT (peer->ibp_connecting == 0);
1421         peer->ibp_connecting = 1;
1422
1423         /* always called with a ref on ni, which prevents ni being shutdown */
1424         LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
1425
1426         if (tx != NULL)
1427                 cfs_list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1428
1429         kiblnd_peer_addref(peer);
1430         cfs_list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1431
1432         cfs_write_unlock_irqrestore(g_lock, flags);
1433
1434         kiblnd_connect_peer(peer);
1435         kiblnd_peer_decref(peer);
1436 }
1437
1438 int
1439 kiblnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1440 {
1441         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
1442         int               type = lntmsg->msg_type;
1443         lnet_process_id_t target = lntmsg->msg_target;
1444         int               target_is_router = lntmsg->msg_target_is_router;
1445         int               routing = lntmsg->msg_routing;
1446         unsigned int      payload_niov = lntmsg->msg_niov;
1447         struct iovec     *payload_iov = lntmsg->msg_iov;
1448         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1449         unsigned int      payload_offset = lntmsg->msg_offset;
1450         unsigned int      payload_nob = lntmsg->msg_len;
1451         kib_msg_t        *ibmsg;
1452         kib_tx_t         *tx;
1453         int               nob;
1454         int               rc;
1455
1456         /* NB 'private' is different depending on what we're sending.... */
1457
1458         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1459                payload_nob, payload_niov, libcfs_id2str(target));
1460
1461         LASSERT (payload_nob == 0 || payload_niov > 0);
1462         LASSERT (payload_niov <= LNET_MAX_IOV);
1463
1464         /* Thread context */
1465         LASSERT (!cfs_in_interrupt());
1466         /* payload is either all vaddrs or all pages */
1467         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1468
1469         switch (type) {
1470         default:
1471                 LBUG();
1472                 return (-EIO);
1473
1474         case LNET_MSG_ACK:
1475                 LASSERT (payload_nob == 0);
1476                 break;
1477
1478         case LNET_MSG_GET:
1479                 if (routing || target_is_router)
1480                         break;                  /* send IMMEDIATE */
1481
1482                 /* is the REPLY message too small for RDMA? */
1483                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1484                 if (nob <= IBLND_MSG_SIZE)
1485                         break;                  /* send IMMEDIATE */
1486
1487                 tx = kiblnd_get_idle_tx(ni);
1488                 if (tx == NULL) {
1489                         CERROR("Can't allocate txd for GET to %s: \n",
1490                                libcfs_nid2str(target.nid));
1491                         return -ENOMEM;
1492                 }
1493
1494                 ibmsg = tx->tx_msg;
1495
1496                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1497                         rc = kiblnd_setup_rd_iov(ni, tx,
1498                                                  &ibmsg->ibm_u.get.ibgm_rd,
1499                                                  lntmsg->msg_md->md_niov,
1500                                                  lntmsg->msg_md->md_iov.iov,
1501                                                  0, lntmsg->msg_md->md_length);
1502                 else
1503                         rc = kiblnd_setup_rd_kiov(ni, tx,
1504                                                   &ibmsg->ibm_u.get.ibgm_rd,
1505                                                   lntmsg->msg_md->md_niov,
1506                                                   lntmsg->msg_md->md_iov.kiov,
1507                                                   0, lntmsg->msg_md->md_length);
1508                 if (rc != 0) {
1509                         CERROR("Can't setup GET sink for %s: %d\n",
1510                                libcfs_nid2str(target.nid), rc);
1511                         kiblnd_tx_done(ni, tx);
1512                         return -EIO;
1513                 }
1514
1515                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[tx->tx_nfrags]);
1516                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1517                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1518
1519                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1520
1521                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1522                 if (tx->tx_lntmsg[1] == NULL) {
1523                         CERROR("Can't create reply for GET -> %s\n",
1524                                libcfs_nid2str(target.nid));
1525                         kiblnd_tx_done(ni, tx);
1526                         return -EIO;
1527                 }
1528
1529                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1530                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1531                 kiblnd_launch_tx(ni, tx, target.nid);
1532                 return 0;
1533
1534         case LNET_MSG_REPLY:
1535         case LNET_MSG_PUT:
1536                 /* Is the payload small enough not to need RDMA? */
1537                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1538                 if (nob <= IBLND_MSG_SIZE)
1539                         break;                  /* send IMMEDIATE */
1540
1541                 tx = kiblnd_get_idle_tx(ni);
1542                 if (tx == NULL) {
1543                         CERROR("Can't allocate %s txd for %s\n",
1544                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1545                                libcfs_nid2str(target.nid));
1546                         return -ENOMEM;
1547                 }
1548
1549                 if (payload_kiov == NULL)
1550                         rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1551                                                  payload_niov, payload_iov,
1552                                                  payload_offset, payload_nob);
1553                 else
1554                         rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1555                                                   payload_niov, payload_kiov,
1556                                                   payload_offset, payload_nob);
1557                 if (rc != 0) {
1558                         CERROR("Can't setup PUT src for %s: %d\n",
1559                                libcfs_nid2str(target.nid), rc);
1560                         kiblnd_tx_done(ni, tx);
1561                         return -EIO;
1562                 }
1563
1564                 ibmsg = tx->tx_msg;
1565                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1566                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1567                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1568
1569                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1570                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1571                 kiblnd_launch_tx(ni, tx, target.nid);
1572                 return 0;
1573         }
1574
1575         /* send IMMEDIATE */
1576
1577         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1578                  <= IBLND_MSG_SIZE);
1579
1580         tx = kiblnd_get_idle_tx(ni);
1581         if (tx == NULL) {
1582                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1583                         type, libcfs_nid2str(target.nid));
1584                 return -ENOMEM;
1585         }
1586
1587         ibmsg = tx->tx_msg;
1588         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1589
1590         if (payload_kiov != NULL)
1591                 lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg,
1592                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1593                                     payload_niov, payload_kiov,
1594                                     payload_offset, payload_nob);
1595         else
1596                 lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg,
1597                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1598                                    payload_niov, payload_iov,
1599                                    payload_offset, payload_nob);
1600
1601         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1602         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1603
1604         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1605         kiblnd_launch_tx(ni, tx, target.nid);
1606         return 0;
1607 }
1608
1609 void
1610 kiblnd_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1611 {
1612         lnet_process_id_t target = lntmsg->msg_target;
1613         unsigned int      niov = lntmsg->msg_niov;
1614         struct iovec     *iov = lntmsg->msg_iov;
1615         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1616         unsigned int      offset = lntmsg->msg_offset;
1617         unsigned int      nob = lntmsg->msg_len;
1618         kib_tx_t         *tx;
1619         int               rc;
1620
1621         tx = kiblnd_get_idle_tx(ni);
1622         if (tx == NULL) {
1623                 CERROR("Can't get tx for REPLY to %s\n",
1624                        libcfs_nid2str(target.nid));
1625                 goto failed_0;
1626         }
1627
1628         if (nob == 0)
1629                 rc = 0;
1630         else if (kiov == NULL)
1631                 rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1632                                          niov, iov, offset, nob);
1633         else
1634                 rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1635                                           niov, kiov, offset, nob);
1636
1637         if (rc != 0) {
1638                 CERROR("Can't setup GET src for %s: %d\n",
1639                        libcfs_nid2str(target.nid), rc);
1640                 goto failed_1;
1641         }
1642
1643         rc = kiblnd_init_rdma(rx->rx_conn, tx,
1644                               IBLND_MSG_GET_DONE, nob,
1645                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1646                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1647         if (rc < 0) {
1648                 CERROR("Can't setup rdma for GET from %s: %d\n",
1649                        libcfs_nid2str(target.nid), rc);
1650                 goto failed_1;
1651         }
1652         
1653         if (nob == 0) {
1654                 /* No RDMA: local completion may happen now! */
1655                 lnet_finalize(ni, lntmsg, 0);
1656         } else {
1657                 /* RDMA: lnet_finalize(lntmsg) when it
1658                  * completes */
1659                 tx->tx_lntmsg[0] = lntmsg;
1660         }
1661
1662         kiblnd_queue_tx(tx, rx->rx_conn);
1663         return;
1664
1665  failed_1:
1666         kiblnd_tx_done(ni, tx);
1667  failed_0:
1668         lnet_finalize(ni, lntmsg, -EIO);
1669 }
1670
1671 int
1672 kiblnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1673              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1674              unsigned int offset, unsigned int mlen, unsigned int rlen)
1675 {
1676         kib_rx_t    *rx = private;
1677         kib_msg_t   *rxmsg = rx->rx_msg;
1678         kib_conn_t  *conn = rx->rx_conn;
1679         kib_tx_t    *tx;
1680         kib_msg_t   *txmsg;
1681         int          nob;
1682         int          post_credit = IBLND_POSTRX_PEER_CREDIT;
1683         int          rc = 0;
1684
1685         LASSERT (mlen <= rlen);
1686         LASSERT (!cfs_in_interrupt());
1687         /* Either all pages or all vaddrs */
1688         LASSERT (!(kiov != NULL && iov != NULL));
1689
1690         switch (rxmsg->ibm_type) {
1691         default:
1692                 LBUG();
1693
1694         case IBLND_MSG_IMMEDIATE:
1695                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1696                 if (nob > rx->rx_nob) {
1697                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1698                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1699                                 nob, rx->rx_nob);
1700                         rc = -EPROTO;
1701                         break;
1702                 }
1703
1704                 if (kiov != NULL)
1705                         lnet_copy_flat2kiov(niov, kiov, offset,
1706                                             IBLND_MSG_SIZE, rxmsg,
1707                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1708                                             mlen);
1709                 else
1710                         lnet_copy_flat2iov(niov, iov, offset,
1711                                            IBLND_MSG_SIZE, rxmsg,
1712                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1713                                            mlen);
1714                 lnet_finalize (ni, lntmsg, 0);
1715                 break;
1716
1717         case IBLND_MSG_PUT_REQ:
1718                 if (mlen == 0) {
1719                         lnet_finalize(ni, lntmsg, 0);
1720                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1721                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1722                         break;
1723                 }
1724
1725                 tx = kiblnd_get_idle_tx(ni);
1726                 if (tx == NULL) {
1727                         CERROR("Can't allocate tx for %s\n",
1728                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1729                         /* Not replying will break the connection */
1730                         rc = -ENOMEM;
1731                         break;
1732                 }
1733
1734                 txmsg = tx->tx_msg;
1735                 if (kiov == NULL)
1736                         rc = kiblnd_setup_rd_iov(ni, tx,
1737                                                  &txmsg->ibm_u.putack.ibpam_rd,
1738                                                  niov, iov, offset, mlen);
1739                 else
1740                         rc = kiblnd_setup_rd_kiov(ni, tx,
1741                                                   &txmsg->ibm_u.putack.ibpam_rd,
1742                                                   niov, kiov, offset, mlen);
1743                 if (rc != 0) {
1744                         CERROR("Can't setup PUT sink for %s: %d\n",
1745                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1746                         kiblnd_tx_done(ni, tx);
1747                         /* tell peer it's over */
1748                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1749                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1750                         break;
1751                 }
1752
1753                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[tx->tx_nfrags]);
1754                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1755                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1756
1757                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1758
1759                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1760                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1761                 kiblnd_queue_tx(tx, conn);
1762
1763                 /* reposted buffer reserved for PUT_DONE */
1764                 post_credit = IBLND_POSTRX_NO_CREDIT;
1765                 break;
1766
1767         case IBLND_MSG_GET_REQ:
1768                 if (lntmsg != NULL) {
1769                         /* Optimized GET; RDMA lntmsg's payload */
1770                         kiblnd_reply(ni, rx, lntmsg);
1771                 } else {
1772                         /* GET didn't match anything */
1773                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1774                                                -ENODATA,
1775                                                rxmsg->ibm_u.get.ibgm_cookie);
1776                 }
1777                 break;
1778         }
1779
1780         kiblnd_post_rx(rx, post_credit);
1781         return rc;
1782 }
1783
1784 int
1785 kiblnd_thread_start (int (*fn)(void *arg), void *arg)
1786 {
1787         long    pid = cfs_create_thread (fn, arg, 0);
1788
1789         if (pid < 0)
1790                 return ((int)pid);
1791
1792         cfs_atomic_inc (&kiblnd_data.kib_nthreads);
1793         return (0);
1794 }
1795
1796 void
1797 kiblnd_thread_fini (void)
1798 {
1799         cfs_atomic_dec (&kiblnd_data.kib_nthreads);
1800 }
1801
1802 void
1803 kiblnd_peer_alive (kib_peer_t *peer)
1804 {
1805         /* This is racy, but everyone's only writing cfs_time_current() */
1806         peer->ibp_last_alive = cfs_time_current();
1807         cfs_mb();
1808 }
1809
1810 void
1811 kiblnd_peer_notify (kib_peer_t *peer)
1812 {
1813         int           error = 0;
1814         cfs_time_t    last_alive = 0;
1815         unsigned long flags;
1816
1817         cfs_read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1818
1819         if (cfs_list_empty(&peer->ibp_conns) &&
1820             peer->ibp_accepting == 0 &&
1821             peer->ibp_connecting == 0 &&
1822             peer->ibp_error != 0) {
1823                 error = peer->ibp_error;
1824                 peer->ibp_error = 0;
1825
1826                 last_alive = peer->ibp_last_alive;
1827         }
1828
1829         cfs_read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1830
1831         if (error != 0)
1832                 lnet_notify(peer->ibp_ni,
1833                             peer->ibp_nid, 0, last_alive);
1834 }
1835
1836 void
1837 kiblnd_close_conn_locked (kib_conn_t *conn, int error)
1838 {
1839         /* This just does the immediate housekeeping.  'error' is zero for a
1840          * normal shutdown which can happen only after the connection has been
1841          * established.  If the connection is established, schedule the
1842          * connection to be finished off by the connd.  Otherwise the connd is
1843          * already dealing with it (either to set it up or tear it down).
1844          * Caller holds kib_global_lock exclusively in irq context */
1845         kib_peer_t       *peer = conn->ibc_peer;
1846         kib_dev_t        *dev;
1847         unsigned long     flags;
1848
1849         LASSERT (error != 0 || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1850
1851         if (error != 0 && conn->ibc_comms_error == 0)
1852                 conn->ibc_comms_error = error;
1853
1854         if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1855                 return; /* already being handled  */
1856
1857         if (error == 0 &&
1858             cfs_list_empty(&conn->ibc_tx_noops) &&
1859             cfs_list_empty(&conn->ibc_tx_queue) &&
1860             cfs_list_empty(&conn->ibc_tx_queue_rsrvd) &&
1861             cfs_list_empty(&conn->ibc_tx_queue_nocred) &&
1862             cfs_list_empty(&conn->ibc_active_txs)) {
1863                 CDEBUG(D_NET, "closing conn to %s\n", 
1864                        libcfs_nid2str(peer->ibp_nid));
1865         } else {
1866                 CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1867                        libcfs_nid2str(peer->ibp_nid), error,
1868                        cfs_list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1869                        cfs_list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1870                        cfs_list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1871                        cfs_list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1872                        cfs_list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1873         }
1874
1875         dev = ((kib_net_t *)peer->ibp_ni->ni_data)->ibn_dev;
1876         cfs_list_del(&conn->ibc_list);
1877         /* connd (see below) takes over ibc_list's ref */
1878
1879         if (cfs_list_empty (&peer->ibp_conns) &&    /* no more conns */
1880             kiblnd_peer_active(peer)) {         /* still in peer table */
1881                 kiblnd_unlink_peer_locked(peer);
1882
1883                 /* set/clear error on last conn */
1884                 peer->ibp_error = conn->ibc_comms_error;
1885         }
1886
1887         kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1888
1889         if (error != 0 &&
1890             kiblnd_dev_can_failover(dev)) {
1891                 cfs_list_add_tail(&dev->ibd_fail_list,
1892                               &kiblnd_data.kib_failed_devs);
1893                 cfs_waitq_signal(&kiblnd_data.kib_failover_waitq);
1894         }
1895
1896         cfs_spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1897
1898         cfs_list_add_tail (&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1899         cfs_waitq_signal (&kiblnd_data.kib_connd_waitq);
1900
1901         cfs_spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1902 }
1903
1904 void
1905 kiblnd_close_conn (kib_conn_t *conn, int error)
1906 {
1907         unsigned long flags;
1908
1909         cfs_write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1910
1911         kiblnd_close_conn_locked(conn, error);
1912
1913         cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1914 }
1915
1916 void
1917 kiblnd_handle_early_rxs(kib_conn_t *conn)
1918 {
1919         unsigned long    flags;
1920         kib_rx_t        *rx;
1921
1922         LASSERT (!cfs_in_interrupt());
1923         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1924
1925         cfs_write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1926         while (!cfs_list_empty(&conn->ibc_early_rxs)) {
1927                 rx = cfs_list_entry(conn->ibc_early_rxs.next,
1928                                 kib_rx_t, rx_list);
1929                 cfs_list_del(&rx->rx_list);
1930                 cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
1931                                             flags);
1932
1933                 kiblnd_handle_rx(rx);
1934
1935                 cfs_write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1936         }
1937         cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1938 }
1939
1940 void
1941 kiblnd_abort_txs(kib_conn_t *conn, cfs_list_t *txs)
1942 {
1943         CFS_LIST_HEAD       (zombies);
1944         cfs_list_t          *tmp;
1945         cfs_list_t          *nxt;
1946         kib_tx_t            *tx;
1947
1948         cfs_spin_lock(&conn->ibc_lock);
1949
1950         cfs_list_for_each_safe (tmp, nxt, txs) {
1951                 tx = cfs_list_entry (tmp, kib_tx_t, tx_list);
1952
1953                 if (txs == &conn->ibc_active_txs) {
1954                         LASSERT (!tx->tx_queued);
1955                         LASSERT (tx->tx_waiting ||
1956                                  tx->tx_sending != 0);
1957                 } else {
1958                         LASSERT (tx->tx_queued);
1959                 }
1960
1961                 tx->tx_status = -ECONNABORTED;
1962                 tx->tx_waiting = 0;
1963
1964                 if (tx->tx_sending == 0) {
1965                         tx->tx_queued = 0;
1966                         cfs_list_del (&tx->tx_list);
1967                         cfs_list_add (&tx->tx_list, &zombies);
1968                 }
1969         }
1970
1971         cfs_spin_unlock(&conn->ibc_lock);
1972
1973         kiblnd_txlist_done(conn->ibc_peer->ibp_ni,
1974                            &zombies, -ECONNABORTED);
1975 }
1976
1977 void
1978 kiblnd_finalise_conn (kib_conn_t *conn)
1979 {
1980         LASSERT (!cfs_in_interrupt());
1981         LASSERT (conn->ibc_state > IBLND_CONN_INIT);
1982
1983         kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
1984
1985         /* abort_receives moves QP state to IB_QPS_ERR.  This is only required
1986          * for connections that didn't get as far as being connected, because
1987          * rdma_disconnect() does this for free. */
1988         kiblnd_abort_receives(conn);
1989
1990         /* Complete all tx descs not waiting for sends to complete.
1991          * NB we should be safe from RDMA now that the QP has changed state */
1992
1993         kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
1994         kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
1995         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
1996         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
1997         kiblnd_abort_txs(conn, &conn->ibc_active_txs);
1998
1999         kiblnd_handle_early_rxs(conn);
2000 }
2001
2002 void
2003 kiblnd_peer_connect_failed (kib_peer_t *peer, int active, int error)
2004 {
2005         CFS_LIST_HEAD    (zombies);
2006         unsigned long     flags;
2007
2008         LASSERT (error != 0);
2009         LASSERT (!cfs_in_interrupt());
2010
2011         cfs_write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2012
2013         if (active) {
2014                 LASSERT (peer->ibp_connecting > 0);
2015                 peer->ibp_connecting--;
2016         } else {
2017                 LASSERT (peer->ibp_accepting > 0);
2018                 peer->ibp_accepting--;
2019         }
2020
2021         if (peer->ibp_connecting != 0 ||
2022             peer->ibp_accepting != 0) {
2023                 /* another connection attempt under way... */
2024                 cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2025                                             flags);
2026                 return;
2027         }
2028
2029         if (cfs_list_empty(&peer->ibp_conns)) {
2030                 /* Take peer's blocked transmits to complete with error */
2031                 cfs_list_add(&zombies, &peer->ibp_tx_queue);
2032                 cfs_list_del_init(&peer->ibp_tx_queue);
2033
2034                 if (kiblnd_peer_active(peer))
2035                         kiblnd_unlink_peer_locked(peer);
2036
2037                 peer->ibp_error = error;
2038         } else {
2039                 /* Can't have blocked transmits if there are connections */
2040                 LASSERT (cfs_list_empty(&peer->ibp_tx_queue));
2041         }
2042
2043         cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2044
2045         kiblnd_peer_notify(peer);
2046
2047         if (cfs_list_empty (&zombies))
2048                 return;
2049
2050         CNETERR("Deleting messages for %s: connection failed\n",
2051                 libcfs_nid2str(peer->ibp_nid));
2052
2053         kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2054 }
2055
2056 void
2057 kiblnd_connreq_done(kib_conn_t *conn, int status)
2058 {
2059         kib_peer_t        *peer = conn->ibc_peer;
2060         kib_tx_t          *tx;
2061         cfs_list_t         txs;
2062         unsigned long      flags;
2063         int                active;
2064
2065         active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2066
2067         CDEBUG(D_NET,"%s: active(%d), version(%x), status(%d)\n",
2068                libcfs_nid2str(peer->ibp_nid), active,
2069                conn->ibc_version, status);
2070
2071         LASSERT (!cfs_in_interrupt());
2072         LASSERT ((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2073                   peer->ibp_connecting > 0) ||
2074                  (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2075                   peer->ibp_accepting > 0));
2076
2077         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2078         conn->ibc_connvars = NULL;
2079
2080         if (status != 0) {
2081                 /* failed to establish connection */
2082                 kiblnd_peer_connect_failed(peer, active, status);
2083                 kiblnd_finalise_conn(conn);
2084                 return;
2085         }
2086
2087         /* connection established */
2088         cfs_write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2089
2090         conn->ibc_last_send = jiffies;
2091         kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2092         kiblnd_peer_alive(peer);
2093
2094         /* Add conn to peer's list and nuke any dangling conns from a different
2095          * peer instance... */
2096         kiblnd_conn_addref(conn);               /* +1 ref for ibc_list */
2097         cfs_list_add(&conn->ibc_list, &peer->ibp_conns);
2098         if (active)
2099                 peer->ibp_connecting--;
2100         else
2101                 peer->ibp_accepting--;
2102
2103         if (peer->ibp_version == 0) {
2104                 peer->ibp_version     = conn->ibc_version;
2105                 peer->ibp_incarnation = conn->ibc_incarnation;
2106         }
2107
2108         if (peer->ibp_version     != conn->ibc_version ||
2109             peer->ibp_incarnation != conn->ibc_incarnation) {
2110                 kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2111                                                 conn->ibc_incarnation);
2112                 peer->ibp_version     = conn->ibc_version;
2113                 peer->ibp_incarnation = conn->ibc_incarnation;
2114         }
2115
2116         /* grab pending txs while I have the lock */
2117         cfs_list_add(&txs, &peer->ibp_tx_queue);
2118         cfs_list_del_init(&peer->ibp_tx_queue);
2119
2120         if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2121             conn->ibc_comms_error != 0) {       /* error has happened already */
2122                 lnet_ni_t *ni = peer->ibp_ni;
2123
2124                 /* start to shut down connection */
2125                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
2126                 cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2127                                             flags);
2128
2129                 kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2130
2131                 return;
2132         }
2133
2134         cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2135
2136         /* Schedule blocked txs */
2137         cfs_spin_lock (&conn->ibc_lock);
2138         while (!cfs_list_empty (&txs)) {
2139                 tx = cfs_list_entry (txs.next, kib_tx_t, tx_list);
2140                 cfs_list_del(&tx->tx_list);
2141
2142                 kiblnd_queue_tx_locked(tx, conn);
2143         }
2144         cfs_spin_unlock (&conn->ibc_lock);
2145
2146         kiblnd_check_sends(conn);
2147
2148         /* schedule blocked rxs */
2149         kiblnd_handle_early_rxs(conn);
2150 }
2151
2152 void
2153 kiblnd_reject(struct rdma_cm_id *cmid, kib_rej_t *rej)
2154 {
2155         int          rc;
2156
2157         rc = rdma_reject(cmid, rej, sizeof(*rej));
2158
2159         if (rc != 0)
2160                 CWARN("Error %d sending reject\n", rc);
2161 }
2162
2163 int
2164 kiblnd_passive_connect (struct rdma_cm_id *cmid, void *priv, int priv_nob)
2165 {
2166         cfs_rwlock_t          *g_lock = &kiblnd_data.kib_global_lock;
2167         kib_msg_t             *reqmsg = priv;
2168         kib_msg_t             *ackmsg;
2169         kib_dev_t             *ibdev;
2170         kib_peer_t            *peer;
2171         kib_peer_t            *peer2;
2172         kib_conn_t            *conn;
2173         lnet_ni_t             *ni  = NULL;
2174         kib_net_t             *net = NULL;
2175         lnet_nid_t             nid;
2176         struct rdma_conn_param cp;
2177         kib_rej_t              rej;
2178         int                    version = IBLND_MSG_VERSION;
2179         unsigned long          flags;
2180         int                    rc;
2181         struct sockaddr_in    *peer_addr;
2182         LASSERT (!cfs_in_interrupt());
2183
2184         /* cmid inherits 'context' from the corresponding listener id */
2185         ibdev = (kib_dev_t *)cmid->context;
2186         LASSERT (ibdev != NULL);
2187
2188         memset(&rej, 0, sizeof(rej));
2189         rej.ibr_magic                = IBLND_MSG_MAGIC;
2190         rej.ibr_why                  = IBLND_REJECT_FATAL;
2191         rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2192
2193         peer_addr = (struct sockaddr_in *)&(cmid->route.addr.dst_addr);
2194         if (*kiblnd_tunables.kib_require_priv_port &&
2195             ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2196                 __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2197                 CERROR("Peer's port (%u.%u.%u.%u:%hu) is not privileged\n",
2198                        HIPQUAD(ip), ntohs(peer_addr->sin_port));
2199                 goto failed;
2200         }
2201
2202         if (priv_nob < offsetof(kib_msg_t, ibm_type)) {
2203                 CERROR("Short connection request\n");
2204                 goto failed;
2205         }
2206
2207         /* Future protocol version compatibility support!  If the
2208          * o2iblnd-specific protocol changes, or when LNET unifies
2209          * protocols over all LNDs, the initial connection will
2210          * negotiate a protocol version.  I trap this here to avoid
2211          * console errors; the reject tells the peer which protocol I
2212          * speak. */
2213         if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2214             reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2215                 goto failed;
2216         if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2217             reqmsg->ibm_version != IBLND_MSG_VERSION &&
2218             reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2219                 goto failed;
2220         if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2221             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2222             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2223                 goto failed;
2224
2225         rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2226         if (rc != 0) {
2227                 CERROR("Can't parse connection request: %d\n", rc);
2228                 goto failed;
2229         }
2230
2231         nid = reqmsg->ibm_srcnid;
2232         ni  = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2233
2234         if (ni != NULL) {
2235                 net = (kib_net_t *)ni->ni_data;
2236                 rej.ibr_incarnation = net->ibn_incarnation;
2237         }
2238
2239         if (ni == NULL ||                         /* no matching net */
2240             ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2241             net->ibn_dev != ibdev) {              /* wrong device */
2242                 CERROR("Can't accept %s on %s (%s:%d:%u.%u.%u.%u): "
2243                        "bad dst nid %s\n", libcfs_nid2str(nid),
2244                        ni == NULL ? "NA" : libcfs_nid2str(ni->ni_nid),
2245                        ibdev->ibd_ifname, ibdev->ibd_nnets,
2246                        HIPQUAD(ibdev->ibd_ifip),
2247                        libcfs_nid2str(reqmsg->ibm_dstnid));
2248
2249                 goto failed;
2250         }
2251
2252        /* check time stamp as soon as possible */
2253         if (reqmsg->ibm_dststamp != 0 &&
2254             reqmsg->ibm_dststamp != net->ibn_incarnation) {
2255                 CWARN("Stale connection request\n");
2256                 rej.ibr_why = IBLND_REJECT_CONN_STALE;
2257                 goto failed;
2258         }
2259
2260         /* I can accept peer's version */
2261         version = reqmsg->ibm_version;
2262
2263         if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2264                 CERROR("Unexpected connreq msg type: %x from %s\n",
2265                        reqmsg->ibm_type, libcfs_nid2str(nid));
2266                 goto failed;
2267         }
2268
2269         if (reqmsg->ibm_u.connparams.ibcp_queue_depth !=
2270             IBLND_MSG_QUEUE_SIZE(version)) {
2271                 CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
2272                        libcfs_nid2str(nid), reqmsg->ibm_u.connparams.ibcp_queue_depth,
2273                        IBLND_MSG_QUEUE_SIZE(version));
2274
2275                 if (version == IBLND_MSG_VERSION)
2276                         rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2277
2278                 goto failed;
2279         }
2280
2281         if (reqmsg->ibm_u.connparams.ibcp_max_frags !=
2282             IBLND_RDMA_FRAGS(version)) {
2283                 CERROR("Can't accept %s(version %x): "
2284                        "incompatible max_frags %d (%d wanted)\n",
2285                        libcfs_nid2str(nid), version,
2286                        reqmsg->ibm_u.connparams.ibcp_max_frags,
2287                        IBLND_RDMA_FRAGS(version));
2288
2289                 if (version == IBLND_MSG_VERSION)
2290                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2291
2292                 goto failed;
2293
2294         }
2295
2296         if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2297                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2298                        libcfs_nid2str(nid),
2299                        reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2300                        IBLND_MSG_SIZE);
2301                 goto failed;
2302         }
2303
2304         /* assume 'nid' is a new peer; create  */
2305         rc = kiblnd_create_peer(ni, &peer, nid);
2306         if (rc != 0) {
2307                 CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2308                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2309                 goto failed;
2310         }
2311
2312         cfs_write_lock_irqsave(g_lock, flags);
2313
2314         peer2 = kiblnd_find_peer_locked(nid);
2315         if (peer2 != NULL) {
2316                 if (peer2->ibp_version == 0) {
2317                         peer2->ibp_version     = version;
2318                         peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2319                 }
2320
2321                 /* not the guy I've talked with */
2322                 if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2323                     peer2->ibp_version     != version) {
2324                         kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2325                         cfs_write_unlock_irqrestore(g_lock, flags);
2326
2327                         CWARN("Conn stale %s [old ver: %x, new ver: %x]\n",
2328                               libcfs_nid2str(nid), peer2->ibp_version, version);
2329
2330                         kiblnd_peer_decref(peer);
2331                         rej.ibr_why = IBLND_REJECT_CONN_STALE;
2332                         goto failed;
2333                 }
2334
2335                 /* tie-break connection race in favour of the higher NID */
2336                 if (peer2->ibp_connecting != 0 &&
2337                     nid < ni->ni_nid) {
2338                         cfs_write_unlock_irqrestore(g_lock, flags);
2339
2340                         CWARN("Conn race %s\n", libcfs_nid2str(peer2->ibp_nid));
2341
2342                         kiblnd_peer_decref(peer);
2343                         rej.ibr_why = IBLND_REJECT_CONN_RACE;
2344                         goto failed;
2345                 }
2346
2347                 peer2->ibp_accepting++;
2348                 kiblnd_peer_addref(peer2);
2349
2350                 cfs_write_unlock_irqrestore(g_lock, flags);
2351                 kiblnd_peer_decref(peer);
2352                 peer = peer2;
2353         } else {
2354                 /* Brand new peer */
2355                 LASSERT (peer->ibp_accepting == 0);
2356                 LASSERT (peer->ibp_version == 0 &&
2357                          peer->ibp_incarnation == 0);
2358
2359                 peer->ibp_accepting   = 1;
2360                 peer->ibp_version     = version;
2361                 peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2362
2363                 /* I have a ref on ni that prevents it being shutdown */
2364                 LASSERT (net->ibn_shutdown == 0);
2365
2366                 kiblnd_peer_addref(peer);
2367                 cfs_list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2368
2369                 cfs_write_unlock_irqrestore(g_lock, flags);
2370         }
2371
2372         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT, version);
2373         if (conn == NULL) {
2374                 kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2375                 kiblnd_peer_decref(peer);
2376                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2377                 goto failed;
2378         }
2379
2380         /* conn now "owns" cmid, so I return success from here on to ensure the
2381          * CM callback doesn't destroy cmid. */
2382
2383         conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2384         conn->ibc_credits          = IBLND_MSG_QUEUE_SIZE(version);
2385         conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(version);
2386         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(version)
2387                  <= IBLND_RX_MSGS(version));
2388
2389         ackmsg = &conn->ibc_connvars->cv_msg;
2390         memset(ackmsg, 0, sizeof(*ackmsg));
2391
2392         kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2393                         sizeof(ackmsg->ibm_u.connparams));
2394         ackmsg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2395         ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2396         ackmsg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2397
2398         kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2399
2400         memset(&cp, 0, sizeof(cp));
2401         cp.private_data        = ackmsg;
2402         cp.private_data_len    = ackmsg->ibm_nob;
2403         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2404         cp.initiator_depth     = 0;
2405         cp.flow_control        = 1;
2406         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2407         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2408
2409         CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2410
2411         rc = rdma_accept(cmid, &cp);
2412         if (rc != 0) {
2413                 CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2414                 rej.ibr_version = version;
2415                 rej.ibr_why     = IBLND_REJECT_FATAL;
2416
2417                 kiblnd_reject(cmid, &rej);
2418                 kiblnd_connreq_done(conn, rc);
2419                 kiblnd_conn_decref(conn);
2420         }
2421
2422         lnet_ni_decref(ni);
2423         return 0;
2424
2425  failed:
2426         if (ni != NULL)
2427                 lnet_ni_decref(ni);
2428
2429         rej.ibr_version = version;
2430         rej.ibr_cp.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE(version);
2431         rej.ibr_cp.ibcp_max_frags   = IBLND_RDMA_FRAGS(version);
2432         kiblnd_reject(cmid, &rej);
2433
2434         return -ECONNREFUSED;
2435 }
2436
2437 void
2438 kiblnd_reconnect (kib_conn_t *conn, int version,
2439                   __u64 incarnation, int why, kib_connparams_t *cp)
2440 {
2441         kib_peer_t    *peer = conn->ibc_peer;
2442         char          *reason;
2443         int            retry = 0;
2444         unsigned long  flags;
2445
2446         LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2447         LASSERT (peer->ibp_connecting > 0);     /* 'conn' at least */
2448
2449         cfs_write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2450
2451         /* retry connection if it's still needed and no other connection
2452          * attempts (active or passive) are in progress
2453          * NB: reconnect is still needed even when ibp_tx_queue is
2454          * empty if ibp_version != version because reconnect may be
2455          * initiated by kiblnd_query() */
2456         if ((!cfs_list_empty(&peer->ibp_tx_queue) ||
2457              peer->ibp_version != version) &&
2458             peer->ibp_connecting == 1 &&
2459             peer->ibp_accepting == 0) {
2460                 retry = 1;
2461                 peer->ibp_connecting++;
2462
2463                 peer->ibp_version     = version;
2464                 peer->ibp_incarnation = incarnation;
2465         }
2466
2467         cfs_write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2468
2469         if (!retry)
2470                 return;
2471
2472         switch (why) {
2473         default:
2474                 reason = "Unknown";
2475                 break;
2476
2477         case IBLND_REJECT_CONN_STALE:
2478                 reason = "stale";
2479                 break;
2480
2481         case IBLND_REJECT_CONN_RACE:
2482                 reason = "conn race";
2483                 break;
2484
2485         case IBLND_REJECT_CONN_UNCOMPAT:
2486                 reason = "version negotiation";
2487                 break;
2488         }
2489
2490         CNETERR("%s: retrying (%s), %x, %x, "
2491                 "queue_dep: %d, max_frag: %d, msg_size: %d\n",
2492                 libcfs_nid2str(peer->ibp_nid),
2493                 reason, IBLND_MSG_VERSION, version,
2494                 cp != NULL? cp->ibcp_queue_depth :IBLND_MSG_QUEUE_SIZE(version),
2495                 cp != NULL? cp->ibcp_max_frags   : IBLND_RDMA_FRAGS(version),
2496                 cp != NULL? cp->ibcp_max_msg_size: IBLND_MSG_SIZE);
2497
2498         kiblnd_connect_peer(peer);
2499 }
2500
2501 void
2502 kiblnd_rejected (kib_conn_t *conn, int reason, void *priv, int priv_nob)
2503 {
2504         kib_peer_t    *peer = conn->ibc_peer;
2505
2506         LASSERT (!cfs_in_interrupt());
2507         LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2508
2509         switch (reason) {
2510         case IB_CM_REJ_STALE_CONN:
2511                 kiblnd_reconnect(conn, IBLND_MSG_VERSION, 0,
2512                                  IBLND_REJECT_CONN_STALE, NULL);
2513                 break;
2514
2515         case IB_CM_REJ_INVALID_SERVICE_ID:
2516                 CNETERR("%s rejected: no listener at %d\n",
2517                         libcfs_nid2str(peer->ibp_nid),
2518                         *kiblnd_tunables.kib_service);
2519                 break;
2520
2521         case IB_CM_REJ_CONSUMER_DEFINED:
2522                 if (priv_nob >= offsetof(kib_rej_t, ibr_padding)) {
2523                         kib_rej_t        *rej         = priv;
2524                         kib_connparams_t *cp          = NULL;
2525                         int               flip        = 0;
2526                         __u64             incarnation = -1;
2527
2528                         /* NB. default incarnation is -1 because:
2529                          * a) V1 will ignore dst incarnation in connreq.
2530                          * b) V2 will provide incarnation while rejecting me,
2531                          *    -1 will be overwrote.
2532                          *
2533                          * if I try to connect to a V1 peer with V2 protocol,
2534                          * it rejected me then upgrade to V2, I have no idea
2535                          * about the upgrading and try to reconnect with V1,
2536                          * in this case upgraded V2 can find out I'm trying to
2537                          * talk to the old guy and reject me(incarnation is -1). 
2538                          */
2539
2540                         if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2541                             rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2542                                 __swab32s(&rej->ibr_magic);
2543                                 __swab16s(&rej->ibr_version);
2544                                 flip = 1;
2545                         }
2546
2547                         if (priv_nob >= sizeof(kib_rej_t) &&
2548                             rej->ibr_version > IBLND_MSG_VERSION_1) {
2549                                 /* priv_nob is always 148 in current version
2550                                  * of OFED, so we still need to check version.
2551                                  * (define of IB_CM_REJ_PRIVATE_DATA_SIZE) */
2552                                 cp = &rej->ibr_cp;
2553
2554                                 if (flip) {
2555                                         __swab64s(&rej->ibr_incarnation);
2556                                         __swab16s(&cp->ibcp_queue_depth);
2557                                         __swab16s(&cp->ibcp_max_frags);
2558                                         __swab32s(&cp->ibcp_max_msg_size);
2559                                 }
2560
2561                                 incarnation = rej->ibr_incarnation;
2562                         }
2563
2564                         if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2565                             rej->ibr_magic != LNET_PROTO_MAGIC) {
2566                                 CERROR("%s rejected: consumer defined fatal error\n",
2567                                        libcfs_nid2str(peer->ibp_nid));
2568                                 break;
2569                         }
2570
2571                         if (rej->ibr_version != IBLND_MSG_VERSION &&
2572                             rej->ibr_version != IBLND_MSG_VERSION_1) {
2573                                 CERROR("%s rejected: o2iblnd version %x error\n",
2574                                        libcfs_nid2str(peer->ibp_nid),
2575                                        rej->ibr_version);
2576                                 break;
2577                         }
2578
2579                         if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2580                             rej->ibr_version == IBLND_MSG_VERSION_1) {
2581                                 CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2582                                        libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2583
2584                                 if (conn->ibc_version != IBLND_MSG_VERSION_1)
2585                                         rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2586                         }
2587
2588                         switch (rej->ibr_why) {
2589                         case IBLND_REJECT_CONN_RACE:
2590                         case IBLND_REJECT_CONN_STALE:
2591                         case IBLND_REJECT_CONN_UNCOMPAT:
2592                                 kiblnd_reconnect(conn, rej->ibr_version,
2593                                                  incarnation, rej->ibr_why, cp);
2594                                 break;
2595
2596                         case IBLND_REJECT_MSG_QUEUE_SIZE:
2597                                 CERROR("%s rejected: incompatible message queue depth %d, %d\n",
2598                                        libcfs_nid2str(peer->ibp_nid), cp->ibcp_queue_depth,
2599                                        IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
2600                                 break;
2601
2602                         case IBLND_REJECT_RDMA_FRAGS:
2603                                 CERROR("%s rejected: incompatible # of RDMA fragments %d, %d\n",
2604                                        libcfs_nid2str(peer->ibp_nid), cp->ibcp_max_frags,
2605                                        IBLND_RDMA_FRAGS(conn->ibc_version));
2606                                 break;
2607
2608                         case IBLND_REJECT_NO_RESOURCES:
2609                                 CERROR("%s rejected: o2iblnd no resources\n",
2610                                        libcfs_nid2str(peer->ibp_nid));
2611                                 break;
2612
2613                         case IBLND_REJECT_FATAL:
2614                                 CERROR("%s rejected: o2iblnd fatal error\n",
2615                                        libcfs_nid2str(peer->ibp_nid));
2616                                 break;
2617
2618                         default:
2619                                 CERROR("%s rejected: o2iblnd reason %d\n",
2620                                        libcfs_nid2str(peer->ibp_nid),
2621                                        rej->ibr_why);
2622                                 break;
2623                         }
2624                         break;
2625                 }
2626                 /* fall through */
2627         default:
2628                 CNETERR("%s rejected: reason %d, size %d\n",
2629                         libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2630                 break;
2631         }
2632
2633         kiblnd_connreq_done(conn, -ECONNREFUSED);
2634 }
2635
2636 void
2637 kiblnd_check_connreply (kib_conn_t *conn, void *priv, int priv_nob)
2638 {
2639         kib_peer_t    *peer = conn->ibc_peer;
2640         lnet_ni_t     *ni   = peer->ibp_ni;
2641         kib_net_t     *net  = ni->ni_data;
2642         kib_msg_t     *msg  = priv;
2643         int            ver  = conn->ibc_version;
2644         int            rc   = kiblnd_unpack_msg(msg, priv_nob);
2645         unsigned long  flags;
2646
2647         LASSERT (net != NULL);
2648
2649         if (rc != 0) {
2650                 CERROR("Can't unpack connack from %s: %d\n",
2651                        libcfs_nid2str(peer->ibp_nid), rc);
2652                 goto failed;
2653         }
2654
2655         if (msg->ibm_type != IBLND_MSG_CONNACK) {
2656                 CERROR("Unexpected message %d from %s\n",
2657                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2658                 rc = -EPROTO;
2659                 goto failed;
2660         }
2661
2662         if (ver != msg->ibm_version) {
2663                 CERROR("%s replied version %x is different with "
2664                        "requested version %x\n",
2665                        libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2666                 rc = -EPROTO;
2667                 goto failed;
2668         }
2669
2670         if (msg->ibm_u.connparams.ibcp_queue_depth !=
2671             IBLND_MSG_QUEUE_SIZE(ver)) {
2672                 CERROR("%s has incompatible queue depth %d(%d wanted)\n",
2673                        libcfs_nid2str(peer->ibp_nid),
2674                        msg->ibm_u.connparams.ibcp_queue_depth,
2675                        IBLND_MSG_QUEUE_SIZE(ver));
2676                 rc = -EPROTO;
2677                 goto failed;
2678         }
2679
2680         if (msg->ibm_u.connparams.ibcp_max_frags !=
2681             IBLND_RDMA_FRAGS(ver)) {
2682                 CERROR("%s has incompatible max_frags %d (%d wanted)\n",
2683                        libcfs_nid2str(peer->ibp_nid),
2684                        msg->ibm_u.connparams.ibcp_max_frags,
2685                        IBLND_RDMA_FRAGS(ver));
2686                 rc = -EPROTO;
2687                 goto failed;
2688         }
2689
2690         if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2691                 CERROR("%s max message size %d too big (%d max)\n",
2692                        libcfs_nid2str(peer->ibp_nid),
2693                        msg->ibm_u.connparams.ibcp_max_msg_size,
2694                        IBLND_MSG_SIZE);
2695                 rc = -EPROTO;
2696                 goto failed;
2697         }
2698
2699         cfs_read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2700         if (msg->ibm_dstnid == ni->ni_nid &&
2701             msg->ibm_dststamp == net->ibn_incarnation)
2702                 rc = 0;
2703         else
2704                 rc = -ESTALE;
2705         cfs_read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2706
2707         if (rc != 0) {
2708                 CERROR("Bad connection reply from %s, rc = %d, "
2709                        "version: %x max_frags: %d\n",
2710                        libcfs_nid2str(peer->ibp_nid), rc,
2711                        msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2712                 goto failed;
2713         }
2714
2715         conn->ibc_incarnation      = msg->ibm_srcstamp;
2716         conn->ibc_credits          =
2717         conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(ver);
2718         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(ver)
2719                  <= IBLND_RX_MSGS(ver));
2720
2721         kiblnd_connreq_done(conn, 0);
2722         return;
2723
2724  failed:
2725         /* NB My QP has already established itself, so I handle anything going
2726          * wrong here by setting ibc_comms_error.
2727          * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2728          * immediately tears it down. */
2729
2730         LASSERT (rc != 0);
2731         conn->ibc_comms_error = rc;
2732         kiblnd_connreq_done(conn, 0);
2733 }
2734
2735 int
2736 kiblnd_active_connect (struct rdma_cm_id *cmid)
2737 {
2738         kib_peer_t              *peer = (kib_peer_t *)cmid->context;
2739         kib_conn_t              *conn;
2740         kib_msg_t               *msg;
2741         struct rdma_conn_param   cp;
2742         int                      version;
2743         __u64                    incarnation;
2744         unsigned long            flags;
2745         int                      rc;
2746
2747         cfs_read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2748
2749         incarnation = peer->ibp_incarnation;
2750         version     = (peer->ibp_version == 0) ? IBLND_MSG_VERSION : peer->ibp_version;
2751
2752         cfs_read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2753
2754         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT, version);
2755         if (conn == NULL) {
2756                 kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2757                 kiblnd_peer_decref(peer); /* lose cmid's ref */
2758                 return -ENOMEM;
2759         }
2760
2761         /* conn "owns" cmid now, so I return success from here on to ensure the
2762          * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2763          * on peer */
2764
2765         msg = &conn->ibc_connvars->cv_msg;
2766
2767         memset(msg, 0, sizeof(*msg));
2768         kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2769         msg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2770         msg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2771         msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2772
2773         kiblnd_pack_msg(peer->ibp_ni, msg, version,
2774                         0, peer->ibp_nid, incarnation);
2775
2776         memset(&cp, 0, sizeof(cp));
2777         cp.private_data        = msg;
2778         cp.private_data_len    = msg->ibm_nob;
2779         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2780         cp.initiator_depth     = 0;
2781         cp.flow_control        = 1;
2782         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2783         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2784
2785         LASSERT(cmid->context == (void *)conn);
2786         LASSERT(conn->ibc_cmid == cmid);
2787
2788         rc = rdma_connect(cmid, &cp);
2789         if (rc != 0) {
2790                 CERROR("Can't connect to %s: %d\n",
2791                        libcfs_nid2str(peer->ibp_nid), rc);
2792                 kiblnd_connreq_done(conn, rc);
2793                 kiblnd_conn_decref(conn);
2794         }
2795
2796         return 0;
2797 }
2798
2799 int
2800 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2801 {
2802         kib_peer_t  *peer;
2803         kib_conn_t  *conn;
2804         int          rc;
2805
2806         switch (event->event) {
2807         default:
2808                 CERROR("Unexpected event: %d, status: %d\n",
2809                        event->event, event->status);
2810                 LBUG();
2811
2812         case RDMA_CM_EVENT_CONNECT_REQUEST:
2813                 /* destroy cmid on failure */
2814                 rc = kiblnd_passive_connect(cmid, 
2815                                             (void *)KIBLND_CONN_PARAM(event),
2816                                             KIBLND_CONN_PARAM_LEN(event));
2817                 CDEBUG(D_NET, "connreq: %d\n", rc);
2818                 return rc;
2819                 
2820         case RDMA_CM_EVENT_ADDR_ERROR:
2821                 peer = (kib_peer_t *)cmid->context;
2822                 CNETERR("%s: ADDR ERROR %d\n",
2823                        libcfs_nid2str(peer->ibp_nid), event->status);
2824                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2825                 kiblnd_peer_decref(peer);
2826                 return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2827
2828         case RDMA_CM_EVENT_ADDR_RESOLVED:
2829                 peer = (kib_peer_t *)cmid->context;
2830
2831                 CDEBUG(D_NET,"%s Addr resolved: %d\n",
2832                        libcfs_nid2str(peer->ibp_nid), event->status);
2833
2834                 if (event->status != 0) {
2835                         CNETERR("Can't resolve address for %s: %d\n",
2836                                 libcfs_nid2str(peer->ibp_nid), event->status);
2837                         rc = event->status;
2838                 } else {
2839                         rc = rdma_resolve_route(
2840                                 cmid, *kiblnd_tunables.kib_timeout * 1000);
2841                         if (rc == 0)
2842                                 return 0;
2843                         /* Can't initiate route resolution */
2844                         CERROR("Can't resolve route for %s: %d\n",
2845                                libcfs_nid2str(peer->ibp_nid), rc);
2846                 }
2847                 kiblnd_peer_connect_failed(peer, 1, rc);
2848                 kiblnd_peer_decref(peer);
2849                 return rc;                      /* rc != 0 destroys cmid */
2850
2851         case RDMA_CM_EVENT_ROUTE_ERROR:
2852                 peer = (kib_peer_t *)cmid->context;
2853                 CNETERR("%s: ROUTE ERROR %d\n",
2854                         libcfs_nid2str(peer->ibp_nid), event->status);
2855                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2856                 kiblnd_peer_decref(peer);
2857                 return -EHOSTUNREACH;           /* rc != 0 destroys cmid */
2858
2859         case RDMA_CM_EVENT_ROUTE_RESOLVED:
2860                 peer = (kib_peer_t *)cmid->context;
2861                 CDEBUG(D_NET,"%s Route resolved: %d\n",
2862                        libcfs_nid2str(peer->ibp_nid), event->status);
2863
2864                 if (event->status == 0)
2865                         return kiblnd_active_connect(cmid);
2866
2867                 CNETERR("Can't resolve route for %s: %d\n",
2868                        libcfs_nid2str(peer->ibp_nid), event->status);
2869                 kiblnd_peer_connect_failed(peer, 1, event->status);
2870                 kiblnd_peer_decref(peer);
2871                 return event->status;           /* rc != 0 destroys cmid */
2872                 
2873         case RDMA_CM_EVENT_UNREACHABLE:
2874                 conn = (kib_conn_t *)cmid->context;
2875                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2876                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2877                 CNETERR("%s: UNREACHABLE %d\n",
2878                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2879                 kiblnd_connreq_done(conn, -ENETDOWN);
2880                 kiblnd_conn_decref(conn);
2881                 return 0;
2882
2883         case RDMA_CM_EVENT_CONNECT_ERROR:
2884                 conn = (kib_conn_t *)cmid->context;
2885                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2886                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2887                 CNETERR("%s: CONNECT ERROR %d\n",
2888                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2889                 kiblnd_connreq_done(conn, -ENOTCONN);
2890                 kiblnd_conn_decref(conn);
2891                 return 0;
2892
2893         case RDMA_CM_EVENT_REJECTED:
2894                 conn = (kib_conn_t *)cmid->context;
2895                 switch (conn->ibc_state) {
2896                 default:
2897                         LBUG();
2898
2899                 case IBLND_CONN_PASSIVE_WAIT:
2900                         CERROR ("%s: REJECTED %d\n",
2901                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
2902                                 event->status);
2903                         kiblnd_connreq_done(conn, -ECONNRESET);
2904                         break;
2905
2906                 case IBLND_CONN_ACTIVE_CONNECT:
2907                         kiblnd_rejected(conn, event->status,
2908                                         (void *)KIBLND_CONN_PARAM(event),
2909                                         KIBLND_CONN_PARAM_LEN(event));
2910                         break;
2911                 }
2912                 kiblnd_conn_decref(conn);
2913                 return 0;
2914
2915         case RDMA_CM_EVENT_ESTABLISHED:
2916                 conn = (kib_conn_t *)cmid->context;
2917                 switch (conn->ibc_state) {
2918                 default:
2919                         LBUG();
2920
2921                 case IBLND_CONN_PASSIVE_WAIT:
2922                         CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
2923                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2924                         kiblnd_connreq_done(conn, 0);
2925                         break;
2926
2927                 case IBLND_CONN_ACTIVE_CONNECT:
2928                         CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
2929                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2930                         kiblnd_check_connreply(conn,
2931                                                (void *)KIBLND_CONN_PARAM(event),
2932                                                KIBLND_CONN_PARAM_LEN(event));
2933                         break;
2934                 }
2935                 /* net keeps its ref on conn! */
2936                 return 0;
2937
2938 #ifdef HAVE_OFED_RDMA_CMEV_TIMEWAIT_EXIT
2939         case RDMA_CM_EVENT_TIMEWAIT_EXIT:
2940                 CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
2941                 return 0;
2942 #endif
2943         case RDMA_CM_EVENT_DISCONNECTED:
2944                 conn = (kib_conn_t *)cmid->context;
2945                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
2946                         CERROR("%s DISCONNECTED\n",
2947                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2948                         kiblnd_connreq_done(conn, -ECONNRESET);
2949                 } else {
2950                         kiblnd_close_conn(conn, 0);
2951                 }
2952                 kiblnd_conn_decref(conn);
2953                 cmid->context = NULL;
2954                 return 0;
2955
2956         case RDMA_CM_EVENT_DEVICE_REMOVAL:
2957                 LCONSOLE_ERROR_MSG(0x131,
2958                                    "Received notification of device removal\n"
2959                                    "Please shutdown LNET to allow this to proceed\n");
2960                 /* Can't remove network from underneath LNET for now, so I have
2961                  * to ignore this */
2962                 return 0;
2963
2964 #ifdef HAVE_OFED_RDMA_CMEV_ADDRCHANGE
2965         case RDMA_CM_EVENT_ADDR_CHANGE:
2966                 LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
2967                 return 0;
2968 #endif
2969         }
2970 }
2971
2972 static int
2973 kiblnd_check_txs_locked(kib_conn_t *conn, cfs_list_t *txs)
2974 {
2975         kib_tx_t          *tx;
2976         cfs_list_t        *ttmp;
2977
2978         cfs_list_for_each (ttmp, txs) {
2979                 tx = cfs_list_entry (ttmp, kib_tx_t, tx_list);
2980
2981                 if (txs != &conn->ibc_active_txs) {
2982                         LASSERT (tx->tx_queued);
2983                 } else {
2984                         LASSERT (!tx->tx_queued);
2985                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2986                 }
2987
2988                 if (cfs_time_aftereq (jiffies, tx->tx_deadline)) {
2989                         CERROR("Timed out tx: %s, %lu seconds\n",
2990                                kiblnd_queue2str(conn, txs),
2991                                cfs_duration_sec(jiffies - tx->tx_deadline));
2992                         return 1;
2993                 }
2994         }
2995
2996         return 0;
2997 }
2998
2999 static int
3000 kiblnd_conn_timed_out_locked(kib_conn_t *conn)
3001 {
3002         return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3003                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3004                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3005                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3006                 kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3007 }
3008
3009 void
3010 kiblnd_check_conns (int idx)
3011 {
3012         CFS_LIST_HEAD (closes);
3013         CFS_LIST_HEAD (checksends);
3014         cfs_list_t    *peers = &kiblnd_data.kib_peers[idx];
3015         cfs_list_t    *ptmp;
3016         kib_peer_t    *peer;
3017         kib_conn_t    *conn;
3018         cfs_list_t    *ctmp;
3019         unsigned long  flags;
3020
3021         /* NB. We expect to have a look at all the peers and not find any
3022          * RDMAs to time out, so we just use a shared lock while we
3023          * take a look... */
3024         cfs_read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3025
3026         cfs_list_for_each (ptmp, peers) {
3027                 peer = cfs_list_entry (ptmp, kib_peer_t, ibp_list);
3028
3029                 cfs_list_for_each (ctmp, &peer->ibp_conns) {
3030                         int timedout;
3031                         int sendnoop;
3032
3033                         conn = cfs_list_entry(ctmp, kib_conn_t, ibc_list);
3034
3035                         LASSERT (conn->ibc_state == IBLND_CONN_ESTABLISHED);
3036
3037                         cfs_spin_lock(&conn->ibc_lock);
3038
3039                         sendnoop = kiblnd_need_noop(conn);
3040                         timedout = kiblnd_conn_timed_out_locked(conn);
3041                         if (!sendnoop && !timedout) {
3042                                 cfs_spin_unlock(&conn->ibc_lock);
3043                                 continue;
3044                         }
3045
3046                         if (timedout) {
3047                                 CERROR("Timed out RDMA with %s (%lu): "
3048                                        "c: %u, oc: %u, rc: %u\n",
3049                                        libcfs_nid2str(peer->ibp_nid),
3050                                        cfs_duration_sec(cfs_time_current() -
3051                                                         peer->ibp_last_alive),
3052                                        conn->ibc_credits,
3053                                        conn->ibc_outstanding_credits,
3054                                        conn->ibc_reserved_credits);
3055                                 cfs_list_add(&conn->ibc_connd_list, &closes);
3056                         } else {
3057                                 cfs_list_add(&conn->ibc_connd_list,
3058                                              &checksends);
3059                         }
3060                         /* +ref for 'closes' or 'checksends' */
3061                         kiblnd_conn_addref(conn);
3062
3063                         cfs_spin_unlock(&conn->ibc_lock);
3064                 }
3065         }
3066
3067         cfs_read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3068
3069         /* Handle timeout by closing the whole
3070          * connection. We can only be sure RDMA activity
3071          * has ceased once the QP has been modified. */
3072         while (!cfs_list_empty(&closes)) {
3073                 conn = cfs_list_entry(closes.next,
3074                                       kib_conn_t, ibc_connd_list);
3075                 cfs_list_del(&conn->ibc_connd_list);
3076                 kiblnd_close_conn(conn, -ETIMEDOUT);
3077                 kiblnd_conn_decref(conn);
3078         }
3079
3080         /* In case we have enough credits to return via a
3081          * NOOP, but there were no non-blocking tx descs
3082          * free to do it last time... */
3083         while (!cfs_list_empty(&checksends)) {
3084                 conn = cfs_list_entry(checksends.next,
3085                                       kib_conn_t, ibc_connd_list);
3086                 cfs_list_del(&conn->ibc_connd_list);
3087                 kiblnd_check_sends(conn);
3088                 kiblnd_conn_decref(conn);
3089         }
3090 }
3091
3092 void
3093 kiblnd_disconnect_conn (kib_conn_t *conn)
3094 {
3095         LASSERT (!cfs_in_interrupt());
3096         LASSERT (current == kiblnd_data.kib_connd);
3097         LASSERT (conn->ibc_state == IBLND_CONN_CLOSING);
3098
3099         rdma_disconnect(conn->ibc_cmid);
3100         kiblnd_finalise_conn(conn);
3101
3102         kiblnd_peer_notify(conn->ibc_peer);
3103 }
3104
3105 int
3106 kiblnd_connd (void *arg)
3107 {
3108         cfs_waitlink_t     wait;
3109         unsigned long      flags;
3110         kib_conn_t        *conn;
3111         int                timeout;
3112         int                i;
3113         int                dropped_lock;
3114         int                peer_index = 0;
3115         unsigned long      deadline = jiffies;
3116
3117         cfs_daemonize ("kiblnd_connd");
3118         cfs_block_allsigs ();
3119
3120         cfs_waitlink_init (&wait);
3121         kiblnd_data.kib_connd = current;
3122
3123         cfs_spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3124
3125         while (!kiblnd_data.kib_shutdown) {
3126
3127                 dropped_lock = 0;
3128
3129                 if (!cfs_list_empty (&kiblnd_data.kib_connd_zombies)) {
3130                         conn = cfs_list_entry(kiblnd_data. \
3131                                               kib_connd_zombies.next,
3132                                               kib_conn_t, ibc_list);
3133                         cfs_list_del(&conn->ibc_list);
3134
3135                         cfs_spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock,
3136                                                    flags);
3137                         dropped_lock = 1;
3138
3139                         kiblnd_destroy_conn(conn);
3140
3141                         cfs_spin_lock_irqsave (&kiblnd_data.kib_connd_lock,
3142                                                flags);
3143                 }
3144
3145                 if (!cfs_list_empty (&kiblnd_data.kib_connd_conns)) {
3146                         conn = cfs_list_entry (kiblnd_data.kib_connd_conns.next,
3147                                                kib_conn_t, ibc_list);
3148                         cfs_list_del(&conn->ibc_list);
3149
3150                         cfs_spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock,
3151                                                     flags);
3152                         dropped_lock = 1;
3153
3154                         kiblnd_disconnect_conn(conn);
3155                         kiblnd_conn_decref(conn);
3156
3157                         cfs_spin_lock_irqsave (&kiblnd_data.kib_connd_lock,
3158                                                flags);
3159                 }
3160
3161                 /* careful with the jiffy wrap... */
3162                 timeout = (int)(deadline - jiffies);
3163                 if (timeout <= 0) {
3164                         const int n = 4;
3165                         const int p = 1;
3166                         int       chunk = kiblnd_data.kib_peer_hash_size;
3167
3168                         cfs_spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3169                         dropped_lock = 1;
3170
3171                         /* Time to check for RDMA timeouts on a few more
3172                          * peers: I do checks every 'p' seconds on a
3173                          * proportion of the peer table and I need to check
3174                          * every connection 'n' times within a timeout
3175                          * interval, to ensure I detect a timeout on any
3176                          * connection within (n+1)/n times the timeout
3177                          * interval. */
3178
3179                         if (*kiblnd_tunables.kib_timeout > n * p)
3180                                 chunk = (chunk * n * p) /
3181                                         *kiblnd_tunables.kib_timeout;
3182                         if (chunk == 0)
3183                                 chunk = 1;
3184
3185                         for (i = 0; i < chunk; i++) {
3186                                 kiblnd_check_conns(peer_index);
3187                                 peer_index = (peer_index + 1) %
3188                                              kiblnd_data.kib_peer_hash_size;
3189                         }
3190
3191                         deadline += p * CFS_HZ;
3192                         cfs_spin_lock_irqsave(&kiblnd_data.kib_connd_lock,
3193                                               flags);
3194                 }
3195
3196                 if (dropped_lock)
3197                         continue;
3198
3199                 /* Nothing to do for 'timeout'  */
3200                 cfs_set_current_state (CFS_TASK_INTERRUPTIBLE);
3201                 cfs_waitq_add (&kiblnd_data.kib_connd_waitq, &wait);
3202                 cfs_spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
3203
3204                 cfs_waitq_timedwait(&wait, CFS_TASK_INTERRUPTIBLE, timeout);
3205
3206                 cfs_set_current_state (CFS_TASK_RUNNING);
3207                 cfs_waitq_del (&kiblnd_data.kib_connd_waitq, &wait);
3208                 cfs_spin_lock_irqsave (&kiblnd_data.kib_connd_lock, flags);
3209         }
3210
3211         cfs_spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
3212
3213         kiblnd_thread_fini();
3214         return (0);
3215 }
3216
3217 void
3218 kiblnd_qp_event(struct ib_event *event, void *arg)
3219 {
3220         kib_conn_t *conn = arg;
3221
3222         switch (event->event) {
3223         case IB_EVENT_COMM_EST:
3224                 CDEBUG(D_NET, "%s established\n",
3225                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3226                 return;
3227
3228         default:
3229                 CERROR("%s: Async QP event type %d\n",
3230                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3231                 return;
3232         }
3233 }
3234
3235 void
3236 kiblnd_complete (struct ib_wc *wc)
3237 {
3238         switch (kiblnd_wreqid2type(wc->wr_id)) {
3239         default:
3240                 LBUG();
3241
3242         case IBLND_WID_RDMA:
3243                 /* We only get RDMA completion notification if it fails.  All
3244                  * subsequent work items, including the final SEND will fail
3245                  * too.  However we can't print out any more info about the
3246                  * failing RDMA because 'tx' might be back on the idle list or
3247                  * even reused already if we didn't manage to post all our work
3248                  * items */
3249                 CNETERR("RDMA (tx: %p) failed: %d\n",
3250                         kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3251                 return;
3252
3253         case IBLND_WID_TX:
3254                 kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3255                 return;
3256
3257         case IBLND_WID_RX:
3258                 kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3259                                    wc->byte_len);
3260                 return;
3261         }
3262 }
3263
3264 void
3265 kiblnd_cq_completion (struct ib_cq *cq, void *arg)
3266 {
3267         /* NB I'm not allowed to schedule this conn once its refcount has
3268          * reached 0.  Since fundamentally I'm racing with scheduler threads
3269          * consuming my CQ I could be called after all completions have
3270          * occurred.  But in this case, ibc_nrx == 0 && ibc_nsends_posted == 0
3271          * and this CQ is about to be destroyed so I NOOP. */
3272         kib_conn_t     *conn = (kib_conn_t *)arg;
3273         unsigned long   flags;
3274
3275         LASSERT (cq == conn->ibc_cq);
3276
3277         cfs_spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3278
3279         conn->ibc_ready = 1;
3280
3281         if (!conn->ibc_scheduled &&
3282             (conn->ibc_nrx > 0 ||
3283              conn->ibc_nsends_posted > 0)) {
3284                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3285                 conn->ibc_scheduled = 1;
3286                 cfs_list_add_tail(&conn->ibc_sched_list,
3287                                   &kiblnd_data.kib_sched_conns);
3288                 cfs_waitq_signal(&kiblnd_data.kib_sched_waitq);
3289         }
3290
3291         cfs_spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3292 }
3293
3294 void
3295 kiblnd_cq_event(struct ib_event *event, void *arg)
3296 {
3297         kib_conn_t *conn = arg;
3298
3299         CERROR("%s: async CQ event type %d\n",
3300                libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3301 }
3302
3303 int
3304 kiblnd_scheduler(void *arg)
3305 {
3306         long            id = (long)arg;
3307         cfs_waitlink_t  wait;
3308         char            name[16];
3309         unsigned long   flags;
3310         kib_conn_t     *conn;
3311         struct ib_wc    wc;
3312         int             rc;
3313         int             did_something;
3314         int             busy_loops = 0;
3315
3316         snprintf(name, sizeof(name), "kiblnd_sd_%02ld", id);
3317         cfs_daemonize(name);
3318         cfs_block_allsigs();
3319
3320         cfs_waitlink_init(&wait);
3321
3322         cfs_spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3323
3324         while (!kiblnd_data.kib_shutdown) {
3325                 if (busy_loops++ >= IBLND_RESCHED) {
3326                         cfs_spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3327                                                    flags);
3328
3329                         cfs_cond_resched();
3330                         busy_loops = 0;
3331
3332                         cfs_spin_lock_irqsave(&kiblnd_data.kib_sched_lock,
3333                                               flags);
3334                 }
3335
3336                 did_something = 0;
3337
3338                 if (!cfs_list_empty(&kiblnd_data.kib_sched_conns)) {
3339                         conn = cfs_list_entry(kiblnd_data.kib_sched_conns.next,
3340                                               kib_conn_t, ibc_sched_list);
3341                         /* take over kib_sched_conns' ref on conn... */
3342                         LASSERT(conn->ibc_scheduled);
3343                         cfs_list_del(&conn->ibc_sched_list);
3344                         conn->ibc_ready = 0;
3345
3346                         cfs_spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3347                                                    flags);
3348
3349                         rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3350                         if (rc == 0) {
3351                                 rc = ib_req_notify_cq(conn->ibc_cq,
3352                                                       IB_CQ_NEXT_COMP);
3353                                 if (rc < 0) {
3354                                         CWARN("%s: ib_req_notify_cq failed: %d, "
3355                                               "closing connection\n",
3356                                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3357                                         kiblnd_close_conn(conn, -EIO);
3358                                         kiblnd_conn_decref(conn);
3359                                         cfs_spin_lock_irqsave(&kiblnd_data. \
3360                                                               kib_sched_lock,
3361                                                               flags);
3362                                         continue;
3363                                 }
3364
3365                                 rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3366                         }
3367
3368                         if (rc < 0) {
3369                                 CWARN("%s: ib_poll_cq failed: %d, "
3370                                       "closing connection\n",
3371                                       libcfs_nid2str(conn->ibc_peer->ibp_nid),
3372                                                      rc);
3373                                 kiblnd_close_conn(conn, -EIO);
3374                                 kiblnd_conn_decref(conn);
3375                                 cfs_spin_lock_irqsave(&kiblnd_data. \
3376                                                       kib_sched_lock, flags);
3377                                 continue;
3378                         }
3379
3380                         cfs_spin_lock_irqsave(&kiblnd_data.kib_sched_lock,
3381                                               flags);
3382
3383                         if (rc != 0 || conn->ibc_ready) {
3384                                 /* There may be another completion waiting; get
3385                                  * another scheduler to check while I handle
3386                                  * this one... */
3387                                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3388                                 cfs_list_add_tail(&conn->ibc_sched_list,
3389                                                   &kiblnd_data.kib_sched_conns);
3390                                 cfs_waitq_signal(&kiblnd_data.kib_sched_waitq);
3391                         } else {
3392                                 conn->ibc_scheduled = 0;
3393                         }
3394
3395                         if (rc != 0) {
3396                                 cfs_spin_unlock_irqrestore(&kiblnd_data. \
3397                                                            kib_sched_lock,
3398                                                            flags);
3399
3400                                 kiblnd_complete(&wc);
3401
3402                                 cfs_spin_lock_irqsave(&kiblnd_data. \
3403                                                       kib_sched_lock,
3404                                                       flags);
3405                         }
3406
3407                         kiblnd_conn_decref(conn); /* ...drop my ref from above */
3408                         did_something = 1;
3409                 }
3410
3411                 if (did_something)
3412                         continue;
3413
3414                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
3415                 cfs_waitq_add_exclusive(&kiblnd_data.kib_sched_waitq, &wait);
3416                 cfs_spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3417
3418                 cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
3419                 busy_loops = 0;
3420
3421                 cfs_waitq_del(&kiblnd_data.kib_sched_waitq, &wait);
3422                 cfs_set_current_state(CFS_TASK_RUNNING);
3423                 cfs_spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3424         }
3425
3426         cfs_spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3427
3428         kiblnd_thread_fini();
3429         return (0);
3430 }
3431
3432 int
3433 kiblnd_failover_thread(void *arg)
3434 {
3435         cfs_rwlock_t      *glock = &kiblnd_data.kib_global_lock;
3436         kib_dev_t         *dev;
3437         cfs_waitlink_t     wait;
3438         unsigned long      flags;
3439         int                rc;
3440
3441         LASSERT (*kiblnd_tunables.kib_dev_failover != 0);
3442
3443         cfs_daemonize ("kiblnd_failover");
3444         cfs_block_allsigs ();
3445
3446         cfs_waitlink_init(&wait);
3447         cfs_write_lock_irqsave(glock, flags);
3448
3449         while (!kiblnd_data.kib_shutdown) {
3450                 int     do_failover = 0;
3451                 int     long_sleep;
3452
3453                 cfs_list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3454                                     ibd_fail_list) {
3455                         if (cfs_time_before(cfs_time_current(),
3456                                             dev->ibd_next_failover))
3457                                 continue;
3458                         do_failover = 1;
3459                         break;
3460                 }
3461
3462                 if (do_failover) {
3463                         cfs_list_del_init(&dev->ibd_fail_list);
3464                         dev->ibd_failover = 1;
3465                         cfs_write_unlock_irqrestore(glock, flags);
3466
3467                         rc = kiblnd_dev_failover(dev);
3468
3469                         cfs_write_lock_irqsave(glock, flags);
3470
3471                         LASSERT (dev->ibd_failover);
3472                         dev->ibd_failover = 0;
3473                         if (rc >= 0) { /* Device is OK or failover succeed */
3474                                 dev->ibd_next_failover = cfs_time_shift(3);
3475                                 continue;
3476                         }
3477
3478                         /* failed to failover, retry later */
3479                         dev->ibd_next_failover =
3480                                 cfs_time_shift(min(dev->ibd_failed_failover, 10));
3481                         if (kiblnd_dev_can_failover(dev)) {
3482                                 cfs_list_add_tail(&dev->ibd_fail_list,
3483                                               &kiblnd_data.kib_failed_devs);
3484                         }
3485
3486                         continue;
3487                 }
3488
3489                 /* long sleep if no more pending failover */
3490                 long_sleep = cfs_list_empty(&kiblnd_data.kib_failed_devs);
3491
3492                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
3493                 cfs_waitq_add(&kiblnd_data.kib_failover_waitq, &wait);
3494                 cfs_write_unlock_irqrestore(glock, flags);
3495
3496                 rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3497                                                    cfs_time_seconds(1));
3498                 cfs_set_current_state(CFS_TASK_RUNNING);
3499                 cfs_waitq_del(&kiblnd_data.kib_failover_waitq, &wait);
3500                 cfs_write_lock_irqsave(glock, flags);
3501
3502                 if (!long_sleep || rc != 0)
3503                         continue;
3504
3505                 /* have a long sleep, routine check all active devices,
3506                  * we need checking like this because if there is not active
3507                  * connection on the dev and no SEND from local, we may listen
3508                  * on wrong HCA for ever while there is a bonding failover */
3509                 cfs_list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3510                         if (kiblnd_dev_can_failover(dev)) {
3511                                 cfs_list_add_tail(&dev->ibd_fail_list,
3512                                               &kiblnd_data.kib_failed_devs);
3513                         }
3514                 }
3515         }
3516
3517         cfs_write_unlock_irqrestore(glock, flags);
3518
3519         kiblnd_thread_fini();
3520         return 0;
3521 }