Whamcloud - gitweb
8851a324dad353edad7aa07e96713223842e7600
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "o2iblnd.h"
42
43 char *
44 kiblnd_msgtype2str(int type) 
45 {
46         switch (type) {
47         case IBLND_MSG_CONNREQ:
48                 return "CONNREQ";
49                 
50         case IBLND_MSG_CONNACK:
51                 return "CONNACK";
52                 
53         case IBLND_MSG_NOOP:
54                 return "NOOP";
55                 
56         case IBLND_MSG_IMMEDIATE:
57                 return "IMMEDIATE";
58                 
59         case IBLND_MSG_PUT_REQ:
60                 return "PUT_REQ";
61                 
62         case IBLND_MSG_PUT_NAK:
63                 return "PUT_NAK";
64                 
65         case IBLND_MSG_PUT_ACK:
66                 return "PUT_ACK";
67                 
68         case IBLND_MSG_PUT_DONE:
69                 return "PUT_DONE";
70                 
71         case IBLND_MSG_GET_REQ:
72                 return "GET_REQ";
73                 
74         case IBLND_MSG_GET_DONE:
75                 return "GET_DONE";
76                 
77         default:
78                 return "???";
79         }
80 }
81
82 void
83 kiblnd_tx_done (lnet_ni_t *ni, kib_tx_t *tx)
84 {
85         lnet_msg_t *lntmsg[2];
86         kib_net_t  *net = ni->ni_data;
87         int         rc;
88         int         i;
89
90         LASSERT (net != NULL);
91         LASSERT (!in_interrupt());
92         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
93         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
94         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
95
96 #if IBLND_MAP_ON_DEMAND
97         if (tx->tx_fmr != NULL) {
98                 rc = ib_fmr_pool_unmap(tx->tx_fmr);
99                 LASSERT (rc == 0);
100
101                 if (tx->tx_status != 0) {
102                         rc = ib_flush_fmr_pool(net->ibn_fmrpool);
103                         LASSERT (rc == 0);
104                 }
105
106                 tx->tx_fmr = NULL;
107         }
108 #else
109         if (tx->tx_nfrags != 0) {
110                 kiblnd_dma_unmap_sg(net->ibn_dev->ibd_cmid->device,
111                                     tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
112                 tx->tx_nfrags = 0;
113         }
114 #endif
115         /* tx may have up to 2 lnet msgs to finalise */
116         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
117         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
118         rc = tx->tx_status;
119
120         if (tx->tx_conn != NULL) {
121                 LASSERT (ni == tx->tx_conn->ibc_peer->ibp_ni);
122
123                 kiblnd_conn_decref(tx->tx_conn);
124                 tx->tx_conn = NULL;
125         }
126
127         tx->tx_nwrq = 0;
128         tx->tx_status = 0;
129
130         spin_lock(&net->ibn_tx_lock);
131
132         list_add(&tx->tx_list, &net->ibn_idle_txs);
133
134         spin_unlock(&net->ibn_tx_lock);
135
136         /* delay finalize until my descs have been freed */
137         for (i = 0; i < 2; i++) {
138                 if (lntmsg[i] == NULL)
139                         continue;
140
141                 lnet_finalize(ni, lntmsg[i], rc);
142         }
143 }
144
145 void
146 kiblnd_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int status)
147 {
148         kib_tx_t *tx;
149         
150         while (!list_empty (txlist)) {
151                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
152
153                 list_del (&tx->tx_list);
154                 /* complete now */
155                 tx->tx_waiting = 0;
156                 tx->tx_status = status;
157                 kiblnd_tx_done(ni, tx);
158         }
159 }
160
161 kib_tx_t *
162 kiblnd_get_idle_tx (lnet_ni_t *ni)
163 {
164         kib_net_t     *net = ni->ni_data;
165         kib_tx_t      *tx;
166
167         LASSERT (net != NULL);
168
169         spin_lock(&net->ibn_tx_lock);
170
171         if (list_empty(&net->ibn_idle_txs)) {
172                 spin_unlock(&net->ibn_tx_lock);
173                 return NULL;
174         }
175
176         tx = list_entry(net->ibn_idle_txs.next, kib_tx_t, tx_list);
177         list_del(&tx->tx_list);
178
179         /* Allocate a new completion cookie.  It might not be needed,
180          * but we've got a lock right now and we're unlikely to
181          * wrap... */
182         tx->tx_cookie = kiblnd_data.kib_next_tx_cookie++;
183
184         spin_unlock(&net->ibn_tx_lock);
185
186         LASSERT (tx->tx_nwrq == 0);
187         LASSERT (!tx->tx_queued);
188         LASSERT (tx->tx_sending == 0);
189         LASSERT (!tx->tx_waiting);
190         LASSERT (tx->tx_status == 0);
191         LASSERT (tx->tx_conn == NULL);
192         LASSERT (tx->tx_lntmsg[0] == NULL);
193         LASSERT (tx->tx_lntmsg[1] == NULL);
194 #if IBLND_MAP_ON_DEMAND
195         LASSERT (tx->tx_fmr == NULL);
196 #else
197         LASSERT (tx->tx_nfrags == 0);
198 #endif
199
200         return tx;
201 }
202
203 void
204 kiblnd_drop_rx (kib_rx_t *rx)
205 {
206         kib_conn_t         *conn = rx->rx_conn;
207         unsigned long       flags;
208         
209         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
210         LASSERT (conn->ibc_nrx > 0);
211         conn->ibc_nrx--;
212         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
213
214         kiblnd_conn_decref(conn);
215 }
216
217 int
218 kiblnd_post_rx (kib_rx_t *rx, int credit)
219 {
220         kib_conn_t         *conn = rx->rx_conn;
221         kib_net_t          *net = conn->ibc_peer->ibp_ni->ni_data;
222         struct ib_recv_wr  *bad_wrq;
223         int                 rc;
224
225         LASSERT (net != NULL);
226         LASSERT (!in_interrupt());
227         LASSERT (credit == IBLND_POSTRX_NO_CREDIT ||
228                  credit == IBLND_POSTRX_PEER_CREDIT ||
229                  credit == IBLND_POSTRX_RSRVD_CREDIT);
230
231         rx->rx_sge.length = IBLND_MSG_SIZE;
232         rx->rx_sge.lkey = net->ibn_dev->ibd_mr->lkey;
233         rx->rx_sge.addr = rx->rx_msgaddr;
234
235         rx->rx_wrq.next = NULL;
236         rx->rx_wrq.sg_list = &rx->rx_sge;
237         rx->rx_wrq.num_sge = 1;
238         rx->rx_wrq.wr_id = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
239
240         LASSERT (conn->ibc_state >= IBLND_CONN_INIT);
241         LASSERT (rx->rx_nob >= 0);              /* not posted */
242
243         if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
244                 kiblnd_drop_rx(rx);             /* No more posts for this rx */
245                 return 0;
246         }
247
248         rx->rx_nob = -1;                        /* flag posted */
249
250         rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
251
252         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
253                 return rc;
254
255         if (rc != 0) {
256                 CERROR("Can't post rx for %s: %d\n",
257                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
258                 kiblnd_close_conn(conn, rc);
259                 kiblnd_drop_rx(rx);             /* No more posts for this rx */
260                 return rc;
261         }
262
263         if (credit == IBLND_POSTRX_NO_CREDIT)
264                 return 0;
265
266         spin_lock(&conn->ibc_lock);
267         if (credit == IBLND_POSTRX_PEER_CREDIT)
268                 conn->ibc_outstanding_credits++;
269         else
270                 conn->ibc_reserved_credits++;
271         spin_unlock(&conn->ibc_lock);
272
273         kiblnd_check_sends(conn);
274         return 0;
275 }
276
277 kib_tx_t *
278 kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
279 {
280         struct list_head   *tmp;
281
282         list_for_each(tmp, &conn->ibc_active_txs) {
283                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
284
285                 LASSERT (!tx->tx_queued);
286                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
287
288                 if (tx->tx_cookie != cookie)
289                         continue;
290
291                 if (tx->tx_waiting &&
292                     tx->tx_msg->ibm_type == txtype)
293                         return tx;
294
295                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
296                       tx->tx_waiting ? "" : "NOT ",
297                       tx->tx_msg->ibm_type, txtype);
298         }
299         return NULL;
300 }
301
302 void
303 kiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
304 {
305         kib_tx_t    *tx;
306         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
307         int          idle;
308
309         spin_lock(&conn->ibc_lock);
310
311         tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
312         if (tx == NULL) {
313                 spin_unlock(&conn->ibc_lock);
314
315                 CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
316                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
317                 kiblnd_close_conn(conn, -EPROTO);
318                 return;
319         }
320
321         if (tx->tx_status == 0) {               /* success so far */
322                 if (status < 0) {               /* failed? */
323                         tx->tx_status = status;
324                 } else if (txtype == IBLND_MSG_GET_REQ) {
325                         lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
326                 }
327         }
328
329         tx->tx_waiting = 0;
330
331         idle = !tx->tx_queued && (tx->tx_sending == 0);
332         if (idle)
333                 list_del(&tx->tx_list);
334
335         spin_unlock(&conn->ibc_lock);
336
337         if (idle)
338                 kiblnd_tx_done(ni, tx);
339 }
340
341 void
342 kiblnd_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie)
343 {
344         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
345         kib_tx_t    *tx = kiblnd_get_idle_tx(ni);
346
347         if (tx == NULL) {
348                 CERROR("Can't get tx for completion %x for %s\n",
349                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
350                 return;
351         }
352
353         tx->tx_msg->ibm_u.completion.ibcm_status = status;
354         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
355         kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));
356
357         kiblnd_queue_tx(tx, conn);
358 }
359
360 void
361 kiblnd_handle_rx (kib_rx_t *rx)
362 {
363         kib_msg_t    *msg = rx->rx_msg;
364         kib_conn_t   *conn = rx->rx_conn;
365         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
366         int           credits = msg->ibm_credits;
367         kib_tx_t     *tx;
368         int           rc = 0;
369         int           rc2;
370         int           post_credit;
371
372         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
373
374         CDEBUG (D_NET, "Received %x[%d] from %s\n",
375                 msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));
376
377         if (credits != 0) {
378                 /* Have I received credits that will let me send? */
379                 spin_lock(&conn->ibc_lock);
380
381                 if (conn->ibc_credits + credits > IBLND_MSG_QUEUE_SIZE) {
382                         rc2 = conn->ibc_credits;
383                         spin_unlock(&conn->ibc_lock);
384
385                         CERROR("Bad credits from %s: %d + %d > %d\n",
386                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
387                                rc2, credits, IBLND_MSG_QUEUE_SIZE);
388
389                         kiblnd_close_conn(conn, -EPROTO);
390                         kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
391                         return;
392                 }
393
394                 conn->ibc_credits += credits;
395
396                 /* This ensures the credit taken by NOOP can be returned */
397                 if (msg->ibm_type == IBLND_MSG_NOOP)
398                         conn->ibc_outstanding_credits++;
399
400                 spin_unlock(&conn->ibc_lock);
401                 kiblnd_check_sends(conn);
402         }
403
404         switch (msg->ibm_type) {
405         default:
406                 CERROR("Bad IBLND message type %x from %s\n",
407                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
408                 post_credit = IBLND_POSTRX_NO_CREDIT;
409                 rc = -EPROTO;
410                 break;
411
412         case IBLND_MSG_NOOP:
413                 if (credits != 0) /* credit already posted */
414                         post_credit = IBLND_POSTRX_NO_CREDIT;
415                 else              /* a keepalive NOOP */
416                         post_credit = IBLND_POSTRX_PEER_CREDIT;
417                 break;
418
419         case IBLND_MSG_IMMEDIATE:
420                 post_credit = IBLND_POSTRX_DONT_POST;
421                 rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
422                                 msg->ibm_srcnid, rx, 0);
423                 if (rc < 0)                     /* repost on error */
424                         post_credit = IBLND_POSTRX_PEER_CREDIT;
425                 break;
426
427         case IBLND_MSG_PUT_REQ:
428                 post_credit = IBLND_POSTRX_DONT_POST;
429                 rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
430                                 msg->ibm_srcnid, rx, 1);
431                 if (rc < 0)                     /* repost on error */
432                         post_credit = IBLND_POSTRX_PEER_CREDIT;
433                 break;
434
435         case IBLND_MSG_PUT_NAK:
436                 CWARN ("PUT_NACK from %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid));
437                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
438                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
439                                          msg->ibm_u.completion.ibcm_status,
440                                          msg->ibm_u.completion.ibcm_cookie);
441                 break;
442
443         case IBLND_MSG_PUT_ACK:
444                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
445
446                 spin_lock(&conn->ibc_lock);
447                 tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
448                                                    msg->ibm_u.putack.ibpam_src_cookie);
449                 if (tx != NULL)
450                         list_del(&tx->tx_list);
451                 spin_unlock(&conn->ibc_lock);
452
453                 if (tx == NULL) {
454                         CERROR("Unmatched PUT_ACK from %s\n",
455                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
456                         rc = -EPROTO;
457                         break;
458                 }
459
460                 LASSERT (tx->tx_waiting);
461                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
462                  * (a) I can overwrite tx_msg since my peer has received it!
463                  * (b) tx_waiting set tells tx_complete() it's not done. */
464
465                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
466
467                 rc2 = kiblnd_init_rdma(ni, tx, IBLND_MSG_PUT_DONE,
468                                        kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
469                                        &msg->ibm_u.putack.ibpam_rd,
470                                        msg->ibm_u.putack.ibpam_dst_cookie);
471                 if (rc2 < 0)
472                         CERROR("Can't setup rdma for PUT to %s: %d\n",
473                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
474
475                 spin_lock(&conn->ibc_lock);
476                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
477                 kiblnd_queue_tx_locked(tx, conn);
478                 spin_unlock(&conn->ibc_lock);
479                 break;
480
481         case IBLND_MSG_PUT_DONE:
482                 post_credit = IBLND_POSTRX_PEER_CREDIT;
483                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
484                                          msg->ibm_u.completion.ibcm_status,
485                                          msg->ibm_u.completion.ibcm_cookie);
486                 break;
487
488         case IBLND_MSG_GET_REQ:
489                 post_credit = IBLND_POSTRX_DONT_POST;
490                 rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
491                                 msg->ibm_srcnid, rx, 1);
492                 if (rc < 0)                     /* repost on error */
493                         post_credit = IBLND_POSTRX_PEER_CREDIT;
494                 break;
495
496         case IBLND_MSG_GET_DONE:
497                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
498                 kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
499                                          msg->ibm_u.completion.ibcm_status,
500                                          msg->ibm_u.completion.ibcm_cookie);
501                 break;
502         }
503
504         if (rc < 0)                             /* protocol error */
505                 kiblnd_close_conn(conn, rc);
506
507         if (post_credit != IBLND_POSTRX_DONT_POST)
508                 kiblnd_post_rx(rx, post_credit);
509 }
510
511 void
512 kiblnd_rx_complete (kib_rx_t *rx, int status, int nob)
513 {
514         kib_msg_t    *msg = rx->rx_msg;
515         kib_conn_t   *conn = rx->rx_conn;
516         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
517         kib_net_t    *net = ni->ni_data;
518         unsigned long flags;
519         int           rc;
520         int           err = -EIO;
521
522         LASSERT (net != NULL);
523         LASSERT (rx->rx_nob < 0);               /* was posted */
524         rx->rx_nob = 0;                         /* isn't now */
525
526         if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
527                 goto ignore;
528
529         if (status != IB_WC_SUCCESS) {
530                 CDEBUG(D_NETERROR, "Rx from %s failed: %d\n",
531                        libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
532                 goto failed;
533         }
534
535         LASSERT (nob >= 0);
536         rx->rx_nob = nob;
537
538         rc = kiblnd_unpack_msg(msg, rx->rx_nob);
539         if (rc != 0) {
540                 CERROR ("Error %d unpacking rx from %s\n",
541                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
542                 goto failed;
543         }
544
545         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
546             msg->ibm_dstnid != ni->ni_nid ||
547             msg->ibm_srcstamp != conn->ibc_incarnation ||
548             msg->ibm_dststamp != net->ibn_incarnation) {
549                 CERROR ("Stale rx from %s\n",
550                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
551                 err = -ESTALE;
552                 goto failed;
553         }
554
555         /* set time last known alive */
556         kiblnd_peer_alive(conn->ibc_peer);
557
558         /* racing with connection establishment/teardown! */
559
560         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
561                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
562                 /* must check holding global lock to eliminate race */
563                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
564                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
565                         write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
566                                                 flags);
567                         return;
568                 }
569                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
570                                         flags);
571         }
572         kiblnd_handle_rx(rx);
573         return;
574
575  failed:
576         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
577         kiblnd_close_conn(conn, err);
578  ignore:
579         kiblnd_drop_rx(rx);                     /* Don't re-post rx. */
580 }
581
582 struct page *
583 kiblnd_kvaddr_to_page (unsigned long vaddr)
584 {
585         struct page *page;
586
587         if (vaddr >= VMALLOC_START &&
588             vaddr < VMALLOC_END) {
589                 page = vmalloc_to_page ((void *)vaddr);
590                 LASSERT (page != NULL);
591                 return page;
592         }
593 #ifdef CONFIG_HIGHMEM
594         if (vaddr >= PKMAP_BASE &&
595             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
596                 /* No highmem pages only used for bulk (kiov) I/O */
597                 CERROR("find page for address in highmem\n");
598                 LBUG();
599         }
600 #endif
601         page = virt_to_page (vaddr);
602         LASSERT (page != NULL);
603         return page;
604 }
605
606 #if !IBLND_MAP_ON_DEMAND
607 int
608 kiblnd_setup_rd_iov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd, 
609                     unsigned int niov, struct iovec *iov, int offset, int nob)
610                  
611 {
612         struct scatterlist *sg;
613         int                 i;
614         int                 fragnob;
615         unsigned long       vaddr;
616         struct page        *page;
617         int                 page_offset;
618         kib_net_t          *net = ni->ni_data;
619
620         LASSERT (nob > 0);
621         LASSERT (niov > 0);
622         LASSERT (net != NULL);
623
624         while (offset >= iov->iov_len) {
625                 offset -= iov->iov_len;
626                 niov--;
627                 iov++;
628                 LASSERT (niov > 0);
629         }
630
631         sg = tx->tx_frags;
632         do {
633                 LASSERT (niov > 0);
634
635                 vaddr = ((unsigned long)iov->iov_base) + offset;
636                 page_offset = vaddr & (PAGE_SIZE - 1);
637                 page = kiblnd_kvaddr_to_page(vaddr);
638                 if (page == NULL) {
639                         CERROR ("Can't find page\n");
640                         return -EFAULT;
641                 }
642
643                 fragnob = min((int)(iov->iov_len - offset), nob);
644                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
645
646                 sg_set_page(sg, page, fragnob, page_offset);
647                 sg++;
648
649                 if (offset + fragnob < iov->iov_len) {
650                         offset += fragnob;
651                 } else {
652                         offset = 0;
653                         iov++;
654                         niov--;
655                 }
656                 nob -= fragnob;
657         } while (nob > 0);
658         
659         /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
660          * RDMA sink */
661         tx->tx_nfrags = sg - tx->tx_frags;
662         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
663
664         rd->rd_nfrags = kiblnd_dma_map_sg(net->ibn_dev->ibd_cmid->device,
665                                           tx->tx_frags, tx->tx_nfrags,
666                                           tx->tx_dmadir);
667         rd->rd_key    = (rd != tx->tx_rd) ? 
668                         net->ibn_dev->ibd_mr->rkey : net->ibn_dev->ibd_mr->lkey;
669
670         for (i = 0; i < rd->rd_nfrags; i++) {
671                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
672                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
673                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
674                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
675         }
676         
677         return 0;
678 }
679
680 int
681 kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd, 
682                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
683 {
684         struct scatterlist *sg;
685         int                 i;
686         int                 fragnob;
687         kib_net_t          *net = ni->ni_data;
688
689         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
690
691         LASSERT (nob > 0);
692         LASSERT (nkiov > 0);
693         LASSERT (net != NULL);
694
695         while (offset >= kiov->kiov_len) {
696                 offset -= kiov->kiov_len;
697                 nkiov--;
698                 kiov++;
699                 LASSERT (nkiov > 0);
700         }
701
702         sg = tx->tx_frags;
703         do {
704                 LASSERT (nkiov > 0);
705
706                 fragnob = min((int)(kiov->kiov_len - offset), nob);
707
708                 memset(sg, 0, sizeof(*sg));
709                 sg_set_page(sg, kiov->kiov_page, fragnob,
710                             kiov->kiov_offset + offset);
711                 sg++;
712  
713                 offset = 0;
714                 kiov++;
715                 nkiov--;
716                 nob -= fragnob;
717         } while (nob > 0);
718
719         /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
720          * RDMA sink */
721         tx->tx_nfrags = sg - tx->tx_frags;
722         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
723
724         rd->rd_nfrags = kiblnd_dma_map_sg(net->ibn_dev->ibd_cmid->device,
725                                           tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
726         rd->rd_key    = (rd != tx->tx_rd) ? 
727                         net->ibn_dev->ibd_mr->rkey : net->ibn_dev->ibd_mr->lkey;
728
729         for (i = 0; i < tx->tx_nfrags; i++) {
730                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
731                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
732                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
733                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
734 #if 0
735                 CDEBUG(D_WARNING,"frag[%d]: "LPX64" for %d\n",
736                        i, rd->rd_frags[i].rf_addr, rd->rd_frags[i].rf_nob);
737 #endif
738         }
739         
740         return 0;
741 }
742 #else
743 int
744 kiblnd_map_tx (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
745                int npages, unsigned long page_offset, int nob)
746 {
747         struct ib_pool_fmr *fmr;
748         kib_net_t          *net = ni->ni_data;
749
750         LASSERT (net != NULL);
751         LASSERT (tx->tx_fmr == NULL);
752         LASSERT (page_offset < PAGE_SIZE);
753         LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
754         LASSERT (npages <= LNET_MAX_IOV);
755
756         rd->rd_addr = 0;
757
758         fmr = ib_fmr_pool_map_phys(net->ibn_fmrpool, tx->tx_pages,
759                                    npages, rd->rd_addr);
760         if (IS_ERR(fmr)) {
761                 CERROR ("Can't map %d pages: %ld\n", npages, PTR_ERR(fmr));
762                 return PTR_ERR(fmr);
763         }
764
765         /* If rd is not tx_rd, it's going to get sent to a peer, who will need
766          * the rkey */
767
768         rd->rd_key = (rd != tx->tx_rd) ? fmr->fmr->rkey : fmr->fmr->lkey;
769         rd->rd_nob = nob;
770
771         tx->tx_fmr = fmr;
772         return 0;
773 }
774
775 int
776 kiblnd_setup_rd_iov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
777                      unsigned int niov, struct iovec *iov, int offset, int nob)
778
779 {
780         int           resid;
781         int           fragnob;
782         struct page  *page;
783         int           npages;
784         unsigned long page_offset;
785         unsigned long vaddr;
786
787         LASSERT (nob > 0);
788         LASSERT (niov > 0);
789
790         while (offset >= iov->iov_len) {
791                 offset -= iov->iov_len;
792                 niov--;
793                 iov++;
794                 LASSERT (niov > 0);
795         }
796
797         if (nob > iov->iov_len - offset) {
798                 CERROR ("Can't map multiple vaddr fragments\n");
799                 return (-EMSGSIZE);
800         }
801
802         vaddr = ((unsigned long)iov->iov_base) + offset;
803
804         page_offset = vaddr & (PAGE_SIZE - 1);
805         resid = nob;
806         npages = 0;
807
808         do {
809                 LASSERT (npages < LNET_MAX_IOV);
810
811                 page = kiblnd_kvaddr_to_page(vaddr);
812                 if (page == NULL) {
813                         CERROR("Can't find page for %lu\n", vaddr);
814                         return -EFAULT;
815                 }
816
817                 tx->tx_pages[npages++] = lnet_page2phys(page);
818
819                 fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
820                 vaddr += fragnob;
821                 resid -= fragnob;
822
823         } while (resid > 0);
824
825         return kiblnd_map_tx(ni, tx, rd, npages, page_offset, nob);
826 }
827
828 int
829 kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
830                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
831 {
832         int            resid;
833         int            npages;
834         unsigned long  page_offset;
835
836         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
837
838         LASSERT (nob > 0);
839         LASSERT (nkiov > 0);
840         LASSERT (nkiov <= LNET_MAX_IOV);
841
842         while (offset >= kiov->kiov_len) {
843                 offset -= kiov->kiov_len;
844                 nkiov--;
845                 kiov++;
846                 LASSERT (nkiov > 0);
847         }
848
849         page_offset = kiov->kiov_offset + offset;
850
851         resid = offset + nob;
852         npages = 0;
853
854         do {
855                 LASSERT (npages < LNET_MAX_IOV);
856                 LASSERT (nkiov > 0);
857
858                 if ((npages > 0 && kiov->kiov_offset != 0) ||
859                     (resid > kiov->kiov_len &&
860                      (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
861                         /* Can't have gaps */
862                         CERROR ("Can't make payload contiguous in I/O VM:"
863                                 "page %d, offset %d, len %d \n",
864                                 npages, kiov->kiov_offset, kiov->kiov_len);
865
866                         return -EINVAL;
867                 }
868
869                 tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);
870                 resid -= kiov->kiov_len;
871                 kiov++;
872                 nkiov--;
873         } while (resid > 0);
874
875         return kiblnd_map_tx(ni, tx, rd, npages, page_offset, nob);
876 }
877 #endif
878
879 void
880 kiblnd_check_sends (kib_conn_t *conn)
881 {
882         kib_tx_t          *tx;
883         lnet_ni_t         *ni = conn->ibc_peer->ibp_ni;
884         int                rc;
885         int                consume_cred = 0;
886         struct ib_send_wr *bad_wrq;
887         int                done;
888
889         /* Don't send anything until after the connection is established */
890         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
891                 CDEBUG(D_NET, "%s too soon\n",
892                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
893                 return;
894         }
895
896         spin_lock(&conn->ibc_lock);
897
898         LASSERT (conn->ibc_nsends_posted <=
899                  *kiblnd_tunables.kib_concurrent_sends);
900         LASSERT (conn->ibc_reserved_credits >= 0);
901
902         while (conn->ibc_reserved_credits > 0 &&
903                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
904                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
905                                 kib_tx_t, tx_list);
906                 list_del(&tx->tx_list);
907                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
908                 conn->ibc_reserved_credits--;
909         }
910
911         if (kiblnd_send_noop(conn)) {
912                 spin_unlock(&conn->ibc_lock);
913
914                 tx = kiblnd_get_idle_tx(ni);
915                 if (tx != NULL)
916                         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
917
918                 spin_lock(&conn->ibc_lock);
919
920                 if (tx != NULL)
921                         kiblnd_queue_tx_locked(tx, conn);
922         }
923
924         for (;;) {
925                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
926                         tx = list_entry(conn->ibc_tx_queue_nocred.next, 
927                                         kib_tx_t, tx_list);
928                         consume_cred = 0;
929                 } else if (!list_empty(&conn->ibc_tx_noops)) {
930                         tx = list_entry(conn->ibc_tx_noops.next,
931                                         kib_tx_t, tx_list);
932                         consume_cred = 1;
933                 } else if (!list_empty(&conn->ibc_tx_queue)) {
934                         tx = list_entry(conn->ibc_tx_queue.next,
935                                         kib_tx_t, tx_list);
936                         consume_cred = 1;
937                 } else {
938                         /* nothing to send right now */
939                         break;
940                 }
941                 
942                 LASSERT (tx->tx_queued);
943                 /* We rely on this for QP sizing */
944                 LASSERT (tx->tx_nwrq > 0 &&
945                          tx->tx_nwrq <= 1 + IBLND_MAX_RDMA_FRAGS);
946
947                 LASSERT (conn->ibc_outstanding_credits >= 0);
948                 LASSERT (conn->ibc_outstanding_credits <= IBLND_MSG_QUEUE_SIZE);
949                 LASSERT (conn->ibc_credits >= 0);
950                 LASSERT (conn->ibc_credits <= IBLND_MSG_QUEUE_SIZE);
951
952                 if (conn->ibc_nsends_posted == 
953                     *kiblnd_tunables.kib_concurrent_sends) {
954                         /* tx completions outstanding... */
955                         CDEBUG(D_NET, "%s: posted enough\n",
956                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
957                         break;
958                 }
959
960                 if (consume_cred) {
961                         if (conn->ibc_credits == 0) {   /* no credits */
962                                 CDEBUG(D_NET, "%s: no credits\n",
963                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
964                                 break; /* NB ibc_tx_queue_nocred checked */
965                         }
966
967                         /* Last credit reserved for NOOP */
968                         if (conn->ibc_credits == 1 &&
969                             tx->tx_msg->ibm_type != IBLND_MSG_NOOP) {
970                                 CDEBUG(D_NET, "%s: not using last credit\n",
971                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
972                                 break; /* NB ibc_tx_noops checked */
973                         }
974                 }
975
976                 list_del(&tx->tx_list);
977                 tx->tx_queued = 0;
978
979                 /* NB don't drop ibc_lock before bumping tx_sending */
980
981                 if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP &&
982                     !kiblnd_send_noop(conn)) {
983                         /* redundant NOOP */
984                         spin_unlock(&conn->ibc_lock);
985                         kiblnd_tx_done(ni, tx);
986                         spin_lock(&conn->ibc_lock);
987                         CDEBUG(D_NET, "%s: redundant noop\n",
988                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
989                         continue;
990                 }
991
992                 kiblnd_pack_msg(ni, tx->tx_msg, conn->ibc_outstanding_credits,
993                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
994
995                 conn->ibc_outstanding_credits = 0;
996                 conn->ibc_nsends_posted++;
997                 if (consume_cred)
998                         conn->ibc_credits--;
999
1000                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
1001                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
1002                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
1003                  * and then re-queued here.  It's (just) possible that
1004                  * tx_sending is non-zero if we've not done the tx_complete() from
1005                  * the first send; hence the ++ rather than = below. */
1006                 tx->tx_sending++;
1007
1008                 list_add (&tx->tx_list, &conn->ibc_active_txs);
1009 #if 0
1010                 {
1011                         int i;
1012                         
1013                         for (i = 0; i < tx->tx_nwrq - 1; i++) {
1014                                 LASSERT (tx->tx_wrq[i].opcode == IB_WR_RDMA_WRITE);
1015                                 LASSERT (tx->tx_wrq[i].next == &tx->tx_wrq[i+1]);
1016                                 LASSERT (tx->tx_wrq[i].sg_list == &tx->tx_sge[i]);
1017                         
1018                                 CDEBUG(D_WARNING, "WORK[%d]: RDMA "LPX64
1019                                        " for %d k %x -> "LPX64" k %x\n", i,
1020                                        tx->tx_wrq[i].sg_list->addr,
1021                                        tx->tx_wrq[i].sg_list->length,
1022                                        tx->tx_wrq[i].sg_list->lkey,
1023                                        tx->tx_wrq[i].wr.rdma.remote_addr,
1024                                        tx->tx_wrq[i].wr.rdma.rkey);
1025                         }
1026                         
1027                         LASSERT (tx->tx_wrq[i].opcode == IB_WR_SEND);
1028                         LASSERT (tx->tx_wrq[i].next == NULL);
1029                         LASSERT (tx->tx_wrq[i].sg_list == &tx->tx_sge[i]);
1030                         
1031                         CDEBUG(D_WARNING, "WORK[%d]: SEND "LPX64" for %d k %x\n", i,
1032                                tx->tx_wrq[i].sg_list->addr,
1033                                tx->tx_wrq[i].sg_list->length,
1034                                tx->tx_wrq[i].sg_list->lkey);
1035                 }
1036 #endif           
1037                 /* I'm still holding ibc_lock! */
1038                 if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1039                         rc = -ECONNABORTED;
1040                 else
1041                         rc = ib_post_send(conn->ibc_cmid->qp, tx->tx_wrq, &bad_wrq);
1042
1043                 conn->ibc_last_send = jiffies;
1044
1045                 if (rc != 0) {
1046                         /* NB credits are transferred in the actual
1047                          * message, which can only be the last work item */
1048                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
1049                         if (consume_cred)
1050                                 conn->ibc_credits++;
1051                         conn->ibc_nsends_posted--;
1052
1053                         tx->tx_status = rc;
1054                         tx->tx_waiting = 0;
1055                         tx->tx_sending--;
1056
1057                         done = (tx->tx_sending == 0);
1058                         if (done)
1059                                 list_del (&tx->tx_list);
1060
1061                         spin_unlock(&conn->ibc_lock);
1062
1063                         if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
1064                                 CERROR("Error %d posting transmit to %s\n",
1065                                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1066                         else
1067                                 CDEBUG(D_NET, "Error %d posting transmit to %s\n",
1068                                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1069
1070                         kiblnd_close_conn(conn, rc);
1071
1072                         if (done)
1073                                 kiblnd_tx_done(ni, tx);
1074                         return;
1075                 }
1076         }
1077
1078         spin_unlock(&conn->ibc_lock);
1079 }
1080
1081 void
1082 kiblnd_tx_complete (kib_tx_t *tx, int status)
1083 {
1084         int           failed = (status != IB_WC_SUCCESS);
1085         kib_conn_t   *conn = tx->tx_conn;
1086         int           idle;
1087
1088         LASSERT (tx->tx_sending > 0);
1089
1090         if (failed) {
1091                 if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
1092                         CDEBUG(D_NETERROR, "Tx -> %s cookie "LPX64
1093                                " sending %d waiting %d: failed %d\n",
1094                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1095                                tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
1096                                status);
1097
1098                 kiblnd_close_conn(conn, -EIO);
1099         } else {
1100                 kiblnd_peer_alive(conn->ibc_peer);
1101         }
1102
1103         spin_lock(&conn->ibc_lock);
1104
1105         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1106          * gets to free it, which also drops its ref on 'conn'. */
1107
1108         tx->tx_sending--;
1109         conn->ibc_nsends_posted--;
1110
1111         if (failed) {
1112                 tx->tx_waiting = 0;             /* don't wait for peer */
1113                 tx->tx_status = -EIO;
1114         }
1115
1116         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1117                !tx->tx_waiting &&               /* Not waiting for peer */
1118                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1119         if (idle)
1120                 list_del(&tx->tx_list);
1121
1122         kiblnd_conn_addref(conn);               /* 1 ref for me.... */
1123
1124         spin_unlock(&conn->ibc_lock);
1125
1126         if (idle)
1127                 kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1128
1129         kiblnd_check_sends(conn);
1130
1131         kiblnd_conn_decref(conn);               /* ...until here */
1132 }
1133
1134 void
1135 kiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob)
1136 {
1137         kib_net_t         *net = ni->ni_data;
1138         struct ib_sge     *sge = &tx->tx_sge[tx->tx_nwrq];
1139         struct ib_send_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1140         int                nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1141
1142         LASSERT (net != NULL);
1143         LASSERT (tx->tx_nwrq >= 0);
1144         LASSERT (tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1145         LASSERT (nob <= IBLND_MSG_SIZE);
1146
1147         kiblnd_init_msg(tx->tx_msg, type, body_nob);
1148
1149         sge->addr = tx->tx_msgaddr;
1150         sge->lkey = net->ibn_dev->ibd_mr->lkey;
1151         sge->length = nob;
1152
1153         memset(wrq, 0, sizeof(*wrq));
1154
1155         wrq->next       = NULL;
1156         wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1157         wrq->sg_list    = sge;
1158         wrq->num_sge    = 1;
1159         wrq->opcode     = IB_WR_SEND;
1160         wrq->send_flags = IB_SEND_SIGNALED;
1161
1162         tx->tx_nwrq++;
1163 }
1164
1165 int
1166 kiblnd_init_rdma (lnet_ni_t *ni, kib_tx_t *tx, int type,
1167                   int nob, kib_rdma_desc_t *dstrd, __u64 dstcookie)
1168 {
1169         kib_msg_t         *ibmsg = tx->tx_msg;
1170         kib_rdma_desc_t   *srcrd = tx->tx_rd;
1171         struct ib_sge     *sge = &tx->tx_sge[0];
1172         struct ib_send_wr *wrq = &tx->tx_wrq[0];
1173         int                rc = nob;
1174
1175 #if IBLND_MAP_ON_DEMAND
1176         LASSERT (!in_interrupt());
1177         LASSERT (tx->tx_nwrq == 0);
1178         LASSERT (type == IBLND_MSG_GET_DONE ||
1179                  type == IBLND_MSG_PUT_DONE);
1180
1181         sge->addr = srcrd->rd_addr;
1182         sge->lkey = srcrd->rd_key;
1183         sge->length = nob;
1184
1185         wrq = &tx->tx_wrq[0];
1186
1187         wrq->next       = &tx->tx_wrq[1];
1188         wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1189         wrq->sg_list    = sge;
1190         wrq->num_sge    = 1;
1191         wrq->opcode     = IB_WR_RDMA_WRITE;
1192         wrq->send_flags = 0;
1193
1194         wrq->wr.rdma.remote_addr = dstrd->rd_addr;
1195         wrq->wr.rdma.rkey        = dstrd->rd_key;
1196
1197         tx->tx_nwrq = 1;
1198 #else
1199         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1200         int              resid = nob;
1201         kib_rdma_frag_t *srcfrag;
1202         int              srcidx;
1203         kib_rdma_frag_t *dstfrag;
1204         int              dstidx;
1205         int              wrknob;
1206
1207         LASSERT (!in_interrupt());
1208         LASSERT (tx->tx_nwrq == 0);
1209         LASSERT (type == IBLND_MSG_GET_DONE ||
1210                  type == IBLND_MSG_PUT_DONE);
1211
1212         srcidx = dstidx = 0;
1213         srcfrag = &srcrd->rd_frags[0];
1214         dstfrag = &dstrd->rd_frags[0];
1215
1216         while (resid > 0) {
1217                 if (srcidx >= srcrd->rd_nfrags) {
1218                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1219                         rc = -EPROTO;
1220                         break;
1221                 }
1222                 
1223                 if (dstidx == dstrd->rd_nfrags) {
1224                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1225                         rc = -EPROTO;
1226                         break;
1227                 }
1228
1229                 if (tx->tx_nwrq == IBLND_MAX_RDMA_FRAGS) {
1230                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1231                                srcidx, srcrd->rd_nfrags,
1232                                dstidx, dstrd->rd_nfrags);
1233                         rc = -EMSGSIZE;
1234                         break;
1235                 }
1236
1237                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1238
1239                 sge = &tx->tx_sge[tx->tx_nwrq];
1240                 sge->addr   = srcfrag->rf_addr;
1241                 sge->length = wrknob;
1242                 sge->lkey   = srcrd->rd_key;
1243
1244                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1245
1246                 wrq->next       = wrq + 1;
1247                 wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1248                 wrq->sg_list    = sge;
1249                 wrq->num_sge    = 1;
1250                 wrq->opcode     = IB_WR_RDMA_WRITE;
1251                 wrq->send_flags = 0;
1252
1253                 wrq->wr.rdma.remote_addr = dstfrag->rf_addr;
1254                 wrq->wr.rdma.rkey        = dstrd->rd_key;
1255
1256                 wrq++;
1257                 sge++;
1258
1259                 resid -= wrknob;
1260                 if (wrknob < srcfrag->rf_nob) {
1261                         srcfrag->rf_nob  -= wrknob;
1262                         srcfrag->rf_addr += wrknob;
1263                 } else {
1264                         srcfrag++;
1265                         srcidx++;
1266                 }
1267                 
1268                 if (wrknob < dstfrag->rf_nob) {
1269                         dstfrag->rf_nob  -= wrknob;
1270                         dstfrag->rf_addr += wrknob;
1271                 } else {
1272                         dstfrag++;
1273                         dstidx++;
1274                 }
1275
1276                 tx->tx_nwrq++;
1277         }
1278
1279         if (rc < 0)                             /* no RDMA if completing with failure */
1280                 tx->tx_nwrq = 0;
1281 #endif
1282         ibmsg->ibm_u.completion.ibcm_status = rc;
1283         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1284         kiblnd_init_tx_msg(ni, tx, type, sizeof (kib_completion_msg_t));
1285
1286         return rc;
1287 }
1288
1289 void
1290 kiblnd_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
1291 {
1292         struct list_head   *q;
1293
1294         LASSERT (tx->tx_nwrq > 0);              /* work items set up */
1295         LASSERT (!tx->tx_queued);               /* not queued for sending already */
1296
1297         tx->tx_queued = 1;
1298         tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * HZ);
1299
1300         if (tx->tx_conn == NULL) {
1301                 kiblnd_conn_addref(conn);
1302                 tx->tx_conn = conn;
1303                 LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1304         } else {
1305                 /* PUT_DONE first attached to conn as a PUT_REQ */
1306                 LASSERT (tx->tx_conn == conn);
1307                 LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1308         }
1309
1310         switch (tx->tx_msg->ibm_type) {
1311         default:
1312                 LBUG();
1313
1314         case IBLND_MSG_PUT_REQ:
1315         case IBLND_MSG_GET_REQ:
1316                 q = &conn->ibc_tx_queue_rsrvd;
1317                 break;
1318
1319         case IBLND_MSG_PUT_NAK:
1320         case IBLND_MSG_PUT_ACK:
1321         case IBLND_MSG_PUT_DONE:
1322         case IBLND_MSG_GET_DONE:
1323                 q = &conn->ibc_tx_queue_nocred;
1324                 break;
1325
1326         case IBLND_MSG_NOOP:
1327                 q = &conn->ibc_tx_noops;
1328                 break;
1329
1330         case IBLND_MSG_IMMEDIATE:
1331                 q = &conn->ibc_tx_queue;
1332                 break;
1333         }
1334
1335         list_add_tail(&tx->tx_list, q);
1336 }
1337
1338 void
1339 kiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1340 {
1341         spin_lock(&conn->ibc_lock);
1342         kiblnd_queue_tx_locked(tx, conn);
1343         spin_unlock(&conn->ibc_lock);
1344
1345         kiblnd_check_sends(conn);
1346 }
1347
1348 void
1349 kiblnd_connect_peer (kib_peer_t *peer)
1350 {
1351         struct rdma_cm_id *cmid;
1352         kib_net_t         *net = peer->ibp_ni->ni_data;
1353         struct sockaddr_in srcaddr;
1354         struct sockaddr_in dstaddr;
1355         int                rc;
1356
1357         LASSERT (net != NULL);
1358         LASSERT (peer->ibp_connecting > 0);
1359
1360         cmid = rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP);
1361         if (IS_ERR(cmid)) {
1362                 CERROR("Can't create CMID for %s: %ld\n",
1363                        libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1364                 rc = PTR_ERR(cmid);
1365                 goto failed;
1366         }
1367
1368         memset(&srcaddr, 0, sizeof(srcaddr));
1369         srcaddr.sin_family = AF_INET;
1370         srcaddr.sin_addr.s_addr = htonl(net->ibn_dev->ibd_ifip);
1371
1372         memset(&dstaddr, 0, sizeof(dstaddr));
1373         dstaddr.sin_family = AF_INET;
1374         dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1375         dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1376
1377         kiblnd_peer_addref(peer);               /* cmid's ref */
1378
1379         rc = rdma_resolve_addr(cmid,
1380                                (struct sockaddr *)&srcaddr,
1381                                (struct sockaddr *)&dstaddr,
1382                                *kiblnd_tunables.kib_timeout * 1000);
1383         if (rc == 0)
1384                 return;
1385
1386         /* Can't initiate address resolution:  */
1387         CERROR("Can't resolve addr for %s: %d\n",
1388                libcfs_nid2str(peer->ibp_nid), rc);
1389
1390         kiblnd_peer_decref(peer);               /* cmid's ref */
1391         rdma_destroy_id(cmid);
1392  failed:
1393         kiblnd_peer_connect_failed(peer, 1, rc);
1394 }
1395
1396 void
1397 kiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid)
1398 {
1399         kib_peer_t        *peer;
1400         kib_peer_t        *peer2;
1401         kib_conn_t        *conn;
1402         rwlock_t          *g_lock = &kiblnd_data.kib_global_lock;
1403         unsigned long      flags;
1404         int                rc;
1405
1406         /* If I get here, I've committed to send, so I complete the tx with
1407          * failure on any problems */
1408
1409         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1410         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1411
1412         /* First time, just use a read lock since I expect to find my peer
1413          * connected */
1414         read_lock_irqsave(g_lock, flags);
1415
1416         peer = kiblnd_find_peer_locked(nid);
1417         if (peer != NULL && !list_empty(&peer->ibp_conns)) {
1418                 /* Found a peer with an established connection */
1419                 conn = kiblnd_get_conn_locked(peer);
1420                 kiblnd_conn_addref(conn); /* 1 ref for me... */
1421
1422                 read_unlock_irqrestore(g_lock, flags);
1423
1424                 kiblnd_queue_tx(tx, conn);
1425                 kiblnd_conn_decref(conn); /* ...to here */
1426                 return;
1427         }
1428
1429         read_unlock(g_lock);
1430         /* Re-try with a write lock */
1431         write_lock(g_lock);
1432
1433         peer = kiblnd_find_peer_locked(nid);
1434         if (peer != NULL) {
1435                 if (list_empty(&peer->ibp_conns)) {
1436                         /* found a peer, but it's still connecting... */
1437                         LASSERT (peer->ibp_connecting != 0 ||
1438                                  peer->ibp_accepting != 0);
1439                         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1440                         write_unlock_irqrestore(g_lock, flags);
1441                 } else {
1442                         conn = kiblnd_get_conn_locked(peer);
1443                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1444                         
1445                         write_unlock_irqrestore(g_lock, flags);
1446                         
1447                         kiblnd_queue_tx(tx, conn);
1448                         kiblnd_conn_decref(conn); /* ...to here */
1449                 }
1450                 return;
1451         }
1452
1453         write_unlock_irqrestore(g_lock, flags);
1454
1455         /* Allocate a peer ready to add to the peer table and retry */
1456         rc = kiblnd_create_peer(ni, &peer, nid);
1457         if (rc != 0) {
1458                 CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1459                 tx->tx_status = -EHOSTUNREACH;
1460                 tx->tx_waiting = 0;
1461                 kiblnd_tx_done(ni, tx);
1462                 return;
1463         }
1464
1465         write_lock_irqsave(g_lock, flags);
1466
1467         peer2 = kiblnd_find_peer_locked(nid);
1468         if (peer2 != NULL) {
1469                 if (list_empty(&peer2->ibp_conns)) {
1470                         /* found a peer, but it's still connecting... */
1471                         LASSERT (peer2->ibp_connecting != 0 ||
1472                                  peer2->ibp_accepting != 0);
1473                         list_add_tail (&tx->tx_list, &peer2->ibp_tx_queue);
1474                         write_unlock_irqrestore(g_lock, flags);
1475                 } else {
1476                         conn = kiblnd_get_conn_locked(peer2);
1477                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1478
1479                         write_unlock_irqrestore(g_lock, flags);
1480                         
1481                         kiblnd_queue_tx(tx, conn);
1482                         kiblnd_conn_decref(conn); /* ...to here */
1483                 }
1484
1485                 kiblnd_peer_decref(peer);
1486                 return;
1487         }
1488
1489         /* Brand new peer */
1490         LASSERT (peer->ibp_connecting == 0);
1491         peer->ibp_connecting = 1;
1492
1493         /* always called with a ref on ni, which prevents ni being shutdown */
1494         LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
1495
1496         list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1497
1498         kiblnd_peer_addref(peer);
1499         list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1500
1501         write_unlock_irqrestore(g_lock, flags);
1502
1503         kiblnd_connect_peer(peer);
1504         kiblnd_peer_decref(peer);
1505 }
1506
1507 int
1508 kiblnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1509 {
1510         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
1511         int               type = lntmsg->msg_type;
1512         lnet_process_id_t target = lntmsg->msg_target;
1513         int               target_is_router = lntmsg->msg_target_is_router;
1514         int               routing = lntmsg->msg_routing;
1515         unsigned int      payload_niov = lntmsg->msg_niov;
1516         struct iovec     *payload_iov = lntmsg->msg_iov;
1517         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1518         unsigned int      payload_offset = lntmsg->msg_offset;
1519         unsigned int      payload_nob = lntmsg->msg_len;
1520         kib_msg_t        *ibmsg;
1521         kib_tx_t         *tx;
1522         int               nob;
1523         int               rc;
1524
1525         /* NB 'private' is different depending on what we're sending.... */
1526
1527         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1528                payload_nob, payload_niov, libcfs_id2str(target));
1529
1530         LASSERT (payload_nob == 0 || payload_niov > 0);
1531         LASSERT (payload_niov <= LNET_MAX_IOV);
1532
1533         /* Thread context */
1534         LASSERT (!in_interrupt());
1535         /* payload is either all vaddrs or all pages */
1536         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1537
1538         switch (type) {
1539         default:
1540                 LBUG();
1541                 return (-EIO);
1542
1543         case LNET_MSG_ACK:
1544                 LASSERT (payload_nob == 0);
1545                 break;
1546
1547         case LNET_MSG_GET:
1548                 if (routing || target_is_router)
1549                         break;                  /* send IMMEDIATE */
1550
1551                 /* is the REPLY message too small for RDMA? */
1552                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1553                 if (nob <= IBLND_MSG_SIZE)
1554                         break;                  /* send IMMEDIATE */
1555
1556                 tx = kiblnd_get_idle_tx(ni);
1557                 if (tx == NULL) {
1558                         CERROR("Can allocate txd for GET to %s: \n",
1559                                libcfs_nid2str(target.nid));
1560                         return -ENOMEM;
1561                 }
1562
1563                 ibmsg = tx->tx_msg;
1564                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1565                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1566
1567                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1568                         rc = kiblnd_setup_rd_iov(ni, tx,
1569                                                  &ibmsg->ibm_u.get.ibgm_rd,
1570                                                  lntmsg->msg_md->md_niov,
1571                                                  lntmsg->msg_md->md_iov.iov,
1572                                                  0, lntmsg->msg_md->md_length);
1573                 else
1574                         rc = kiblnd_setup_rd_kiov(ni, tx,
1575                                                   &ibmsg->ibm_u.get.ibgm_rd,
1576                                                   lntmsg->msg_md->md_niov,
1577                                                   lntmsg->msg_md->md_iov.kiov,
1578                                                   0, lntmsg->msg_md->md_length);
1579                 if (rc != 0) {
1580                         CERROR("Can't setup GET sink for %s: %d\n",
1581                                libcfs_nid2str(target.nid), rc);
1582                         kiblnd_tx_done(ni, tx);
1583                         return -EIO;
1584                 }
1585 #if IBLND_MAP_ON_DEMAND
1586                 nob = sizeof(kib_get_msg_t);
1587 #else
1588                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[tx->tx_nfrags]);
1589 #endif
1590                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1591
1592                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1593                 if (tx->tx_lntmsg[1] == NULL) {
1594                         CERROR("Can't create reply for GET -> %s\n",
1595                                libcfs_nid2str(target.nid));
1596                         kiblnd_tx_done(ni, tx);
1597                         return -EIO;
1598                 }
1599
1600                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1601                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1602                 kiblnd_launch_tx(ni, tx, target.nid);
1603                 return 0;
1604
1605         case LNET_MSG_REPLY:
1606         case LNET_MSG_PUT:
1607                 /* Is the payload small enough not to need RDMA? */
1608                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1609                 if (nob <= IBLND_MSG_SIZE)
1610                         break;                  /* send IMMEDIATE */
1611
1612                 tx = kiblnd_get_idle_tx(ni);
1613                 if (tx == NULL) {
1614                         CERROR("Can't allocate %s txd for %s\n",
1615                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1616                                libcfs_nid2str(target.nid));
1617                         return -ENOMEM;
1618                 }
1619
1620                 if (payload_kiov == NULL)
1621                         rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1622                                                  payload_niov, payload_iov,
1623                                                  payload_offset, payload_nob);
1624                 else
1625                         rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1626                                                   payload_niov, payload_kiov,
1627                                                   payload_offset, payload_nob);
1628                 if (rc != 0) {
1629                         CERROR("Can't setup PUT src for %s: %d\n",
1630                                libcfs_nid2str(target.nid), rc);
1631                         kiblnd_tx_done(ni, tx);
1632                         return -EIO;
1633                 }
1634
1635                 ibmsg = tx->tx_msg;
1636                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1637                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1638                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1639
1640                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1641                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1642                 kiblnd_launch_tx(ni, tx, target.nid);
1643                 return 0;
1644         }
1645
1646         /* send IMMEDIATE */
1647
1648         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1649                  <= IBLND_MSG_SIZE);
1650
1651         tx = kiblnd_get_idle_tx(ni);
1652         if (tx == NULL) {
1653                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1654                         type, libcfs_nid2str(target.nid));
1655                 return -ENOMEM;
1656         }
1657
1658         ibmsg = tx->tx_msg;
1659         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1660
1661         if (payload_kiov != NULL)
1662                 lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg,
1663                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1664                                     payload_niov, payload_kiov,
1665                                     payload_offset, payload_nob);
1666         else
1667                 lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg,
1668                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1669                                    payload_niov, payload_iov,
1670                                    payload_offset, payload_nob);
1671
1672         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1673         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1674
1675         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1676         kiblnd_launch_tx(ni, tx, target.nid);
1677         return 0;
1678 }
1679
1680 void
1681 kiblnd_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1682 {
1683         lnet_process_id_t target = lntmsg->msg_target;
1684         unsigned int      niov = lntmsg->msg_niov;
1685         struct iovec     *iov = lntmsg->msg_iov;
1686         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1687         unsigned int      offset = lntmsg->msg_offset;
1688         unsigned int      nob = lntmsg->msg_len;
1689         kib_tx_t         *tx;
1690         int               rc;
1691
1692         tx = kiblnd_get_idle_tx(ni);
1693         if (tx == NULL) {
1694                 CERROR("Can't get tx for REPLY to %s\n",
1695                        libcfs_nid2str(target.nid));
1696                 goto failed_0;
1697         }
1698
1699         if (nob == 0)
1700                 rc = 0;
1701         else if (kiov == NULL)
1702                 rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1703                                          niov, iov, offset, nob);
1704         else
1705                 rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1706                                           niov, kiov, offset, nob);
1707
1708         if (rc != 0) {
1709                 CERROR("Can't setup GET src for %s: %d\n",
1710                        libcfs_nid2str(target.nid), rc);
1711                 goto failed_1;
1712         }
1713
1714         rc = kiblnd_init_rdma(ni, tx, IBLND_MSG_GET_DONE, nob,
1715                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1716                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1717         if (rc < 0) {
1718                 CERROR("Can't setup rdma for GET from %s: %d\n",
1719                        libcfs_nid2str(target.nid), rc);
1720                 goto failed_1;
1721         }
1722         
1723         if (nob == 0) {
1724                 /* No RDMA: local completion may happen now! */
1725                 lnet_finalize(ni, lntmsg, 0);
1726         } else {
1727                 /* RDMA: lnet_finalize(lntmsg) when it
1728                  * completes */
1729                 tx->tx_lntmsg[0] = lntmsg;
1730         }
1731
1732         kiblnd_queue_tx(tx, rx->rx_conn);
1733         return;
1734
1735  failed_1:
1736         kiblnd_tx_done(ni, tx);
1737  failed_0:
1738         lnet_finalize(ni, lntmsg, -EIO);
1739 }
1740
1741 int
1742 kiblnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1743              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1744              unsigned int offset, unsigned int mlen, unsigned int rlen)
1745 {
1746         kib_rx_t    *rx = private;
1747         kib_msg_t   *rxmsg = rx->rx_msg;
1748         kib_conn_t  *conn = rx->rx_conn;
1749         kib_tx_t    *tx;
1750         kib_msg_t   *txmsg;
1751         int          nob;
1752         int          post_credit = IBLND_POSTRX_PEER_CREDIT;
1753         int          rc = 0;
1754
1755         LASSERT (mlen <= rlen);
1756         LASSERT (!in_interrupt());
1757         /* Either all pages or all vaddrs */
1758         LASSERT (!(kiov != NULL && iov != NULL));
1759
1760         switch (rxmsg->ibm_type) {
1761         default:
1762                 LBUG();
1763
1764         case IBLND_MSG_IMMEDIATE:
1765                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1766                 if (nob > rx->rx_nob) {
1767                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1768                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1769                                 nob, rx->rx_nob);
1770                         rc = -EPROTO;
1771                         break;
1772                 }
1773
1774                 if (kiov != NULL)
1775                         lnet_copy_flat2kiov(niov, kiov, offset,
1776                                             IBLND_MSG_SIZE, rxmsg,
1777                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1778                                             mlen);
1779                 else
1780                         lnet_copy_flat2iov(niov, iov, offset,
1781                                            IBLND_MSG_SIZE, rxmsg,
1782                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1783                                            mlen);
1784                 lnet_finalize (ni, lntmsg, 0);
1785                 break;
1786
1787         case IBLND_MSG_PUT_REQ:
1788                 if (mlen == 0) {
1789                         lnet_finalize(ni, lntmsg, 0);
1790                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1791                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1792                         break;
1793                 }
1794
1795                 tx = kiblnd_get_idle_tx(ni);
1796                 if (tx == NULL) {
1797                         CERROR("Can't allocate tx for %s\n",
1798                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1799                         /* Not replying will break the connection */
1800                         rc = -ENOMEM;
1801                         break;
1802                 }
1803
1804                 txmsg = tx->tx_msg;
1805                 if (kiov == NULL)
1806                         rc = kiblnd_setup_rd_iov(ni, tx,
1807                                                  &txmsg->ibm_u.putack.ibpam_rd,
1808                                                  niov, iov, offset, mlen);
1809                 else
1810                         rc = kiblnd_setup_rd_kiov(ni, tx, 
1811                                                   &txmsg->ibm_u.putack.ibpam_rd,
1812                                                   niov, kiov, offset, mlen);
1813                 if (rc != 0) {
1814                         CERROR("Can't setup PUT sink for %s: %d\n",
1815                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1816                         kiblnd_tx_done(ni, tx);
1817                         /* tell peer it's over */
1818                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1819                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1820                         break;
1821                 }
1822
1823                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1824                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1825 #if IBLND_MAP_ON_DEMAND
1826                 nob = sizeof(kib_putack_msg_t);
1827 #else
1828                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[tx->tx_nfrags]);
1829 #endif
1830                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1831
1832                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1833                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1834                 kiblnd_queue_tx(tx, conn);
1835
1836                 /* reposted buffer reserved for PUT_DONE */
1837                 post_credit = IBLND_POSTRX_NO_CREDIT;
1838                 break;
1839
1840         case IBLND_MSG_GET_REQ:
1841                 if (lntmsg != NULL) {
1842                         /* Optimized GET; RDMA lntmsg's payload */
1843                         kiblnd_reply(ni, rx, lntmsg);
1844                 } else {
1845                         /* GET didn't match anything */
1846                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1847                                                -ENODATA,
1848                                                rxmsg->ibm_u.get.ibgm_cookie);
1849                 }
1850                 break;
1851         }
1852
1853         kiblnd_post_rx(rx, post_credit);
1854         return rc;
1855 }
1856
1857 int
1858 kiblnd_thread_start (int (*fn)(void *arg), void *arg)
1859 {
1860         long    pid = kernel_thread (fn, arg, 0);
1861
1862         if (pid < 0)
1863                 return ((int)pid);
1864
1865         atomic_inc (&kiblnd_data.kib_nthreads);
1866         return (0);
1867 }
1868
1869 void
1870 kiblnd_thread_fini (void)
1871 {
1872         atomic_dec (&kiblnd_data.kib_nthreads);
1873 }
1874
1875 void
1876 kiblnd_peer_alive (kib_peer_t *peer)
1877 {
1878         /* This is racy, but everyone's only writing cfs_time_current() */
1879         peer->ibp_last_alive = cfs_time_current();
1880         mb();
1881 }
1882
1883 void
1884 kiblnd_peer_notify (kib_peer_t *peer)
1885 {
1886         time_t        last_alive = 0;
1887         int           error = 0;
1888         unsigned long flags;
1889
1890         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1891
1892         if (list_empty(&peer->ibp_conns) &&
1893             peer->ibp_accepting == 0 &&
1894             peer->ibp_connecting == 0 &&
1895             peer->ibp_error != 0) {
1896                 error = peer->ibp_error;
1897                 peer->ibp_error = 0;
1898
1899                 last_alive = cfs_time_current_sec() -
1900                              cfs_duration_sec(cfs_time_current() -
1901                                               peer->ibp_last_alive);
1902         }
1903
1904         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1905
1906         if (error != 0)
1907                 lnet_notify(peer->ibp_ni,
1908                             peer->ibp_nid, 0, last_alive);
1909 }
1910
1911 void
1912 kiblnd_close_conn_locked (kib_conn_t *conn, int error)
1913 {
1914         /* This just does the immediate housekeeping.  'error' is zero for a
1915          * normal shutdown which can happen only after the connection has been
1916          * established.  If the connection is established, schedule the
1917          * connection to be finished off by the connd.  Otherwise the connd is
1918          * already dealing with it (either to set it up or tear it down).
1919          * Caller holds kib_global_lock exclusively in irq context */
1920         unsigned long     flags;
1921         kib_peer_t       *peer = conn->ibc_peer;
1922
1923         LASSERT (error != 0 || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1924
1925         if (error != 0 && conn->ibc_comms_error == 0)
1926                 conn->ibc_comms_error = error;
1927
1928         if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1929                 return; /* already being handled  */
1930
1931         if (error == 0 &&
1932             list_empty(&conn->ibc_tx_noops) &&
1933             list_empty(&conn->ibc_tx_queue) &&
1934             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1935             list_empty(&conn->ibc_tx_queue_nocred) &&
1936             list_empty(&conn->ibc_active_txs)) {
1937                 CDEBUG(D_NET, "closing conn to %s\n", 
1938                        libcfs_nid2str(peer->ibp_nid));
1939         } else {
1940                 CDEBUG(D_NETERROR, "Closing conn to %s: error %d%s%s%s%s%s\n",
1941                        libcfs_nid2str(peer->ibp_nid), error,
1942                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1943                        list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1944                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1945                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1946                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1947         }
1948
1949         list_del (&conn->ibc_list);
1950         /* connd (see below) takes over ibc_list's ref */
1951
1952         if (list_empty (&peer->ibp_conns) &&    /* no more conns */
1953             kiblnd_peer_active(peer)) {         /* still in peer table */
1954                 kiblnd_unlink_peer_locked(peer);
1955
1956                 /* set/clear error on last conn */
1957                 peer->ibp_error = conn->ibc_comms_error;
1958         }
1959
1960         kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1961
1962         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1963
1964         list_add_tail (&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1965         wake_up (&kiblnd_data.kib_connd_waitq);
1966
1967         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1968 }
1969
1970 void
1971 kiblnd_close_conn (kib_conn_t *conn, int error)
1972 {
1973         unsigned long flags;
1974
1975         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1976
1977         kiblnd_close_conn_locked(conn, error);
1978
1979         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1980 }
1981
1982 void
1983 kiblnd_handle_early_rxs(kib_conn_t *conn)
1984 {
1985         unsigned long    flags;
1986         kib_rx_t        *rx;
1987
1988         LASSERT (!in_interrupt());
1989         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1990
1991         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1992         while (!list_empty(&conn->ibc_early_rxs)) {
1993                 rx = list_entry(conn->ibc_early_rxs.next,
1994                                 kib_rx_t, rx_list);
1995                 list_del(&rx->rx_list);
1996                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1997
1998                 kiblnd_handle_rx(rx);
1999
2000                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2001         }
2002         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2003 }
2004
2005 void
2006 kiblnd_abort_txs(kib_conn_t *conn, struct list_head *txs)
2007 {
2008         LIST_HEAD           (zombies); 
2009         struct list_head    *tmp;
2010         struct list_head    *nxt;
2011         kib_tx_t            *tx;
2012
2013         spin_lock(&conn->ibc_lock);
2014
2015         list_for_each_safe (tmp, nxt, txs) {
2016                 tx = list_entry (tmp, kib_tx_t, tx_list);
2017
2018                 if (txs == &conn->ibc_active_txs) {
2019                         LASSERT (!tx->tx_queued);
2020                         LASSERT (tx->tx_waiting ||
2021                                  tx->tx_sending != 0);
2022                 } else {
2023                         LASSERT (tx->tx_queued);
2024                 }
2025
2026                 tx->tx_status = -ECONNABORTED;
2027                 tx->tx_waiting = 0;
2028
2029                 if (tx->tx_sending == 0) {
2030                         tx->tx_queued = 0;
2031                         list_del (&tx->tx_list);
2032                         list_add (&tx->tx_list, &zombies);
2033                 }
2034         }
2035
2036         spin_unlock(&conn->ibc_lock);
2037
2038         kiblnd_txlist_done(conn->ibc_peer->ibp_ni,
2039                            &zombies, -ECONNABORTED);
2040 }
2041
2042 void
2043 kiblnd_finalise_conn (kib_conn_t *conn)
2044 {
2045         LASSERT (!in_interrupt());
2046         LASSERT (conn->ibc_state > IBLND_CONN_INIT);
2047
2048         kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2049
2050         /* abort_receives moves QP state to IB_QPS_ERR.  This is only required
2051          * for connections that didn't get as far as being connected, because
2052          * rdma_disconnect() does this for free. */
2053         kiblnd_abort_receives(conn);
2054
2055         /* Complete all tx descs not waiting for sends to complete.
2056          * NB we should be safe from RDMA now that the QP has changed state */
2057
2058         kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2059         kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2060         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2061         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2062         kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2063
2064         kiblnd_handle_early_rxs(conn);
2065 }
2066
2067 void
2068 kiblnd_peer_connect_failed (kib_peer_t *peer, int active, int error)
2069 {
2070         LIST_HEAD        (zombies);
2071         unsigned long     flags;
2072
2073         LASSERT (error != 0);
2074         LASSERT (!in_interrupt());
2075
2076         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2077
2078         if (active) {
2079                 LASSERT (peer->ibp_connecting > 0);
2080                 peer->ibp_connecting--;
2081         } else {
2082                 LASSERT (peer->ibp_accepting > 0);
2083                 peer->ibp_accepting--;
2084         }
2085
2086         if (peer->ibp_connecting != 0 ||
2087             peer->ibp_accepting != 0) {
2088                 /* another connection attempt under way... */
2089                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2090                 return;
2091         }
2092
2093         if (list_empty(&peer->ibp_conns)) {
2094                 /* Take peer's blocked transmits to complete with error */
2095                 list_add(&zombies, &peer->ibp_tx_queue);
2096                 list_del_init(&peer->ibp_tx_queue);
2097
2098                 if (kiblnd_peer_active(peer))
2099                         kiblnd_unlink_peer_locked(peer);
2100
2101                 peer->ibp_error = error;
2102         } else {
2103                 /* Can't have blocked transmits if there are connections */
2104                 LASSERT (list_empty(&peer->ibp_tx_queue));
2105         }
2106
2107         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2108
2109         kiblnd_peer_notify(peer);
2110
2111         if (list_empty (&zombies))
2112                 return;
2113
2114         CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
2115                 libcfs_nid2str(peer->ibp_nid));
2116
2117         kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2118 }
2119
2120 void
2121 kiblnd_connreq_done(kib_conn_t *conn, int status)
2122 {
2123         struct list_head   txs;
2124
2125         kib_peer_t        *peer = conn->ibc_peer;
2126         int                active;
2127         unsigned long      flags;
2128         kib_tx_t          *tx;
2129
2130         active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2131
2132         CDEBUG(D_NET,"%s: %d, %d\n", libcfs_nid2str(peer->ibp_nid), 
2133                active, status);
2134
2135         LASSERT (!in_interrupt());
2136         LASSERT ((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2137                   peer->ibp_connecting > 0) ||
2138                  (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2139                   peer->ibp_accepting > 0));
2140
2141         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2142         conn->ibc_connvars = NULL;
2143
2144         if (status != 0) {
2145                 /* failed to establish connection */
2146                 kiblnd_peer_connect_failed(peer, active, status);
2147                 kiblnd_finalise_conn(conn);
2148                 return;
2149         }
2150
2151         /* connection established */
2152         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2153
2154         conn->ibc_last_send = jiffies;
2155         kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2156         kiblnd_peer_alive(peer);
2157
2158         /* Add conn to peer's list and nuke any dangling conns from a different
2159          * peer instance... */
2160         kiblnd_conn_addref(conn);               /* +1 ref for ibc_list */
2161         list_add(&conn->ibc_list, &peer->ibp_conns);
2162         if (active)
2163                 peer->ibp_connecting--;
2164         else
2165                 peer->ibp_accepting--;
2166
2167         kiblnd_close_stale_conns_locked(peer, conn->ibc_incarnation);
2168
2169         /* grab pending txs while I have the lock */
2170         list_add(&txs, &peer->ibp_tx_queue);
2171         list_del_init(&peer->ibp_tx_queue);
2172
2173         if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2174             conn->ibc_comms_error != 0) {       /* error has happened already */
2175                 lnet_ni_t *ni = peer->ibp_ni;
2176
2177                 /* start to shut down connection */
2178                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
2179                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2180
2181                 kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2182
2183                 return;
2184         }
2185
2186         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2187
2188         /* Schedule blocked txs */
2189         spin_lock (&conn->ibc_lock);
2190         while (!list_empty (&txs)) {
2191                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2192                 list_del (&tx->tx_list);
2193
2194                 kiblnd_queue_tx_locked(tx, conn);
2195         }
2196         spin_unlock (&conn->ibc_lock);
2197
2198         kiblnd_check_sends(conn);
2199
2200         /* schedule blocked rxs */
2201         kiblnd_handle_early_rxs(conn);
2202 }
2203
2204 void
2205 kiblnd_reject(struct rdma_cm_id *cmid, int why)
2206 {
2207         int          rc;
2208         kib_rej_t    rej = {.ibr_magic   = IBLND_MSG_MAGIC,
2209                             .ibr_version = IBLND_MSG_VERSION,
2210                             .ibr_why     = why};
2211
2212         rc = rdma_reject(cmid, &rej, sizeof(rej));
2213
2214         if (rc != 0)
2215                 CWARN("Error %d sending reject\n", rc);
2216 }
2217
2218 int
2219 kiblnd_passive_connect (struct rdma_cm_id *cmid, void *priv, int priv_nob)
2220 {
2221         kib_msg_t             *ackmsg;
2222         kib_msg_t             *reqmsg = priv;
2223         rwlock_t              *g_lock = &kiblnd_data.kib_global_lock;
2224         struct rdma_conn_param cp;
2225         unsigned long          flags;
2226         lnet_ni_t             *ni = NULL;
2227         kib_dev_t             *ibdev;
2228         kib_peer_t            *peer;
2229         kib_peer_t            *peer2;
2230         kib_conn_t            *conn;
2231         lnet_nid_t             nid;
2232         int                    rc;
2233         int                    rej = IBLND_REJECT_FATAL;
2234
2235         LASSERT (!in_interrupt());
2236
2237         /* cmid inherits 'context' from the corresponding listener id */
2238         ibdev = (kib_dev_t *)cmid->context;
2239         LASSERT (ibdev != NULL);
2240
2241         if (priv_nob < offsetof(kib_msg_t, ibm_type)) {
2242                 CERROR("Short connection request\n");
2243                 goto failed;
2244         }
2245
2246         if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2247             reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC) ||
2248             (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2249              reqmsg->ibm_version != IBLND_MSG_VERSION) ||
2250             (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2251              reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION))) {
2252                 /* Future protocol version compatibility support!  If the
2253                  * o2iblnd-specific protocol changes, or when LNET unifies
2254                  * protocols over all LNDs, the initial connection will
2255                  * negotiate a protocol version.  I trap this here to avoid
2256                  * console errors; the reject tells the peer which protocol I
2257                  * speak. */
2258                 goto failed;
2259         }
2260
2261         rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2262         if (rc != 0) {
2263                 CERROR("Can't parse connection request: %d\n", rc);
2264                 goto failed;
2265         }
2266
2267         nid = reqmsg->ibm_srcnid;
2268
2269         if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2270                 CERROR("Unexpected connreq msg type: %x from %s\n",
2271                        reqmsg->ibm_type, libcfs_nid2str(nid));
2272                 goto failed;
2273         }
2274
2275         if (reqmsg->ibm_u.connparams.ibcp_queue_depth != IBLND_MSG_QUEUE_SIZE) {
2276                 CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
2277                        libcfs_nid2str(nid),
2278                        reqmsg->ibm_u.connparams.ibcp_queue_depth,
2279                        IBLND_MSG_QUEUE_SIZE);
2280                 goto failed;
2281         }
2282
2283         if (reqmsg->ibm_u.connparams.ibcp_max_frags != IBLND_MAX_RDMA_FRAGS) {
2284                 CERROR("Can't accept %s: incompatible max_frags %d (%d wanted)\n",
2285                        libcfs_nid2str(nid),
2286                        reqmsg->ibm_u.connparams.ibcp_max_frags,
2287                        IBLND_MAX_RDMA_FRAGS);
2288                 goto failed;
2289         }
2290
2291         if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2292                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2293                        libcfs_nid2str(nid),
2294                        reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2295                        IBLND_MSG_SIZE);
2296                 goto failed;
2297         }
2298
2299         ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2300         if (ni == NULL ||                               /* no matching net */
2301             ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2302             ((kib_net_t*)ni->ni_data)->ibn_dev != ibdev) { /* wrong device */
2303                 CERROR("Can't accept %s: bad dst nid %s\n",
2304                        libcfs_nid2str(nid),
2305                        libcfs_nid2str(reqmsg->ibm_dstnid));
2306
2307                 goto failed;
2308         }
2309         
2310         /* assume 'nid' is a new peer; create  */
2311         rc = kiblnd_create_peer(ni, &peer, nid);
2312         if (rc != 0) {
2313                 CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2314                 rej = IBLND_REJECT_NO_RESOURCES;
2315                 goto failed;
2316         }
2317
2318         write_lock_irqsave(g_lock, flags);
2319
2320         peer2 = kiblnd_find_peer_locked(nid);
2321         if (peer2 != NULL) {
2322                 /* tie-break connection race in favour of the higher NID */                
2323                 if (peer2->ibp_connecting != 0 &&
2324                     nid < ni->ni_nid) {
2325                         write_unlock_irqrestore(g_lock, flags);
2326
2327                         CWARN("Conn race %s\n",
2328                               libcfs_nid2str(peer2->ibp_nid));
2329
2330                         kiblnd_peer_decref(peer);
2331                         rej = IBLND_REJECT_CONN_RACE;
2332                         goto failed;
2333                 }
2334
2335                 peer2->ibp_accepting++;
2336                 kiblnd_peer_addref(peer2);
2337
2338                 write_unlock_irqrestore(g_lock, flags);
2339                 kiblnd_peer_decref(peer);
2340                 peer = peer2;
2341         } else {
2342                 /* Brand new peer */
2343                 LASSERT (peer->ibp_accepting == 0);
2344                 peer->ibp_accepting = 1;
2345
2346                 /* I have a ref on ni that prevents it being shutdown */
2347                 LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
2348
2349                 kiblnd_peer_addref(peer);
2350                 list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2351
2352                 write_unlock_irqrestore(g_lock, flags);
2353         }
2354
2355         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT);
2356         if (conn == NULL) {
2357                 kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2358                 kiblnd_peer_decref(peer);
2359                 rej = IBLND_REJECT_NO_RESOURCES;
2360                 goto failed;
2361         }
2362
2363         /* conn now "owns" cmid, so I return success from here on to ensure the
2364          * CM callback doesn't destroy cmid. */
2365
2366         conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2367         conn->ibc_credits          = IBLND_MSG_QUEUE_SIZE;
2368         conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE;
2369         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2370                  <= IBLND_RX_MSGS);
2371
2372         ackmsg = &conn->ibc_connvars->cv_msg;
2373         memset(ackmsg, 0, sizeof(*ackmsg));
2374
2375         kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2376                         sizeof(ackmsg->ibm_u.connparams));
2377         ackmsg->ibm_u.connparams.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE;
2378         ackmsg->ibm_u.connparams.ibcp_max_frags = IBLND_MAX_RDMA_FRAGS;
2379         ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2380         kiblnd_pack_msg(ni, ackmsg, 0, nid, reqmsg->ibm_srcstamp);
2381
2382         memset(&cp, 0, sizeof(cp));
2383         cp.private_data        = ackmsg;
2384         cp.private_data_len    = ackmsg->ibm_nob;
2385         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2386         cp.initiator_depth     = 0;
2387         cp.flow_control        = 1;
2388         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2389         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2390
2391         CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2392
2393         rc = rdma_accept(cmid, &cp);
2394         if (rc != 0) {
2395                 CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2396                 kiblnd_reject(cmid, IBLND_REJECT_FATAL);
2397                 kiblnd_connreq_done(conn, rc);
2398                 kiblnd_conn_decref(conn);
2399         }
2400
2401         lnet_ni_decref(ni);
2402         return 0;
2403
2404  failed:
2405         if (ni != NULL)
2406                 lnet_ni_decref(ni);
2407
2408         kiblnd_reject(cmid, rej);
2409         return -ECONNREFUSED;
2410 }
2411
2412 void
2413 kiblnd_reconnect (kib_conn_t *conn, char *why)
2414 {
2415         kib_peer_t    *peer = conn->ibc_peer;
2416         int            retry = 0;
2417         unsigned long  flags;
2418         
2419         LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2420         LASSERT (peer->ibp_connecting > 0);     /* 'conn' at least */
2421
2422         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2423
2424         /* retry connection if it's still needed and no other connection
2425          * attempts (active or passive) are in progress */
2426         if (!list_empty(&peer->ibp_tx_queue) &&
2427             peer->ibp_connecting == 1 &&
2428             peer->ibp_accepting == 0) {
2429                 retry = 1;
2430                 peer->ibp_connecting++;
2431         }
2432         
2433         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2434
2435         if (retry) {
2436                 CDEBUG(D_NETERROR, "%s: retrying (%s)\n", 
2437                        libcfs_nid2str(peer->ibp_nid), why);
2438                 kiblnd_connect_peer(peer);
2439         }
2440 }
2441
2442 void
2443 kiblnd_rejected (kib_conn_t *conn, int reason, void *priv, int priv_nob)
2444 {
2445         kib_peer_t    *peer = conn->ibc_peer;
2446
2447         LASSERT (!in_interrupt());
2448         LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2449
2450         switch (reason) {
2451         case IB_CM_REJ_STALE_CONN:
2452                 kiblnd_reconnect(conn, "stale");
2453                 break;
2454
2455         case IB_CM_REJ_CONSUMER_DEFINED:
2456                 if (priv_nob >= sizeof(kib_rej_t)) {
2457                         kib_rej_t *rej = priv;
2458
2459                         if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2460                             rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2461                                 __swab32s(&rej->ibr_magic);
2462                                 __swab16s(&rej->ibr_version);
2463                         }
2464
2465                         if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2466                             rej->ibr_magic != LNET_PROTO_MAGIC) {
2467                                 CERROR("%s rejected: consumer defined fatal error\n",
2468                                        libcfs_nid2str(peer->ibp_nid));
2469                                 break;
2470                         }
2471                         
2472                         if (rej->ibr_version != IBLND_MSG_VERSION) {
2473                                 CERROR("%s rejected: o2iblnd version %d error\n",
2474                                        libcfs_nid2str(peer->ibp_nid),
2475                                        rej->ibr_version);
2476                                 break;
2477                         }
2478                         
2479                         switch (rej->ibr_why) {
2480                         case IBLND_REJECT_CONN_RACE:
2481                                 kiblnd_reconnect(conn, "conn race");
2482                                 break;
2483                                 
2484                         case IBLND_REJECT_NO_RESOURCES:
2485                                 CERROR("%s rejected: o2iblnd no resources\n",
2486                                        libcfs_nid2str(peer->ibp_nid));
2487                                 break;
2488                         case IBLND_REJECT_FATAL:
2489                                 CERROR("%s rejected: o2iblnd fatal error\n",
2490                                        libcfs_nid2str(peer->ibp_nid));
2491                                 break;
2492                         default:
2493                                 CERROR("%s rejected: o2iblnd reason %d\n",
2494                                        libcfs_nid2str(peer->ibp_nid),
2495                                        rej->ibr_why);
2496                                 break;
2497                         }
2498                         break;
2499                 }
2500                 /* fall through */
2501         default:
2502                 CDEBUG(D_NETERROR, "%s rejected: reason %d, size %d\n",
2503                        libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2504                 break;
2505         }
2506
2507         kiblnd_connreq_done(conn, -ECONNREFUSED);
2508 }
2509
2510 void
2511 kiblnd_check_connreply (kib_conn_t *conn, void *priv, int priv_nob)
2512 {
2513         kib_peer_t    *peer = conn->ibc_peer;
2514         lnet_ni_t     *ni = peer->ibp_ni;
2515         kib_net_t     *net = ni->ni_data;
2516         kib_msg_t     *msg = priv;
2517         int            rc = kiblnd_unpack_msg(msg, priv_nob);
2518         unsigned long  flags;
2519
2520         LASSERT (net != NULL);
2521
2522         if (rc != 0) {
2523                 CERROR("Can't unpack connack from %s: %d\n",
2524                        libcfs_nid2str(peer->ibp_nid), rc);
2525                 goto failed;
2526         }
2527
2528         if (msg->ibm_type != IBLND_MSG_CONNACK) {
2529                 CERROR("Unexpected message %d from %s\n",
2530                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2531                 rc = -EPROTO;
2532                 goto failed;
2533         }
2534
2535         if (msg->ibm_u.connparams.ibcp_queue_depth != IBLND_MSG_QUEUE_SIZE) {
2536                 CERROR("%s has incompatible queue depth %d(%d wanted)\n",
2537                        libcfs_nid2str(peer->ibp_nid),
2538                        msg->ibm_u.connparams.ibcp_queue_depth,
2539                        IBLND_MSG_QUEUE_SIZE);
2540                 rc = -EPROTO;
2541                 goto failed;
2542         }
2543
2544         if (msg->ibm_u.connparams.ibcp_max_frags != IBLND_MAX_RDMA_FRAGS) {
2545                 CERROR("%s has incompatible max_frags %d (%d wanted)\n",
2546                        libcfs_nid2str(peer->ibp_nid),
2547                        msg->ibm_u.connparams.ibcp_max_frags,
2548                        IBLND_MAX_RDMA_FRAGS);
2549                 rc = -EPROTO;
2550                 goto failed;
2551         }
2552
2553         if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2554                 CERROR("%s max message size %d too big (%d max)\n",
2555                        libcfs_nid2str(peer->ibp_nid),
2556                        msg->ibm_u.connparams.ibcp_max_msg_size,
2557                        IBLND_MSG_SIZE);
2558                 rc = -EPROTO;
2559                 goto failed;
2560         }
2561
2562         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2563         if (msg->ibm_dstnid == ni->ni_nid &&
2564             msg->ibm_dststamp == net->ibn_incarnation)
2565                 rc = 0;
2566         else
2567                 rc = -ESTALE;
2568         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2569
2570         if (rc != 0) {
2571                 CERROR("Stale connection reply from %s\n",
2572                        libcfs_nid2str(peer->ibp_nid));
2573                 goto failed;
2574         }
2575
2576         conn->ibc_incarnation      = msg->ibm_srcstamp;
2577         conn->ibc_credits          = IBLND_MSG_QUEUE_SIZE;
2578         conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE;
2579         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2580                  <= IBLND_RX_MSGS);
2581
2582         kiblnd_connreq_done(conn, 0);
2583         return;
2584
2585  failed:
2586         /* NB My QP has already established itself, so I handle anything going
2587          * wrong here by setting ibc_comms_error.
2588          * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2589          * immediately tears it down. */
2590
2591         LASSERT (rc != 0);
2592         conn->ibc_comms_error = rc;
2593         kiblnd_connreq_done(conn, 0);
2594 }
2595
2596 int
2597 kiblnd_active_connect (struct rdma_cm_id *cmid)
2598 {
2599         kib_peer_t              *peer = (kib_peer_t *)cmid->context;
2600         kib_conn_t              *conn;
2601         kib_msg_t               *msg;
2602         struct rdma_conn_param   cp;
2603         int                      rc;
2604
2605         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT);
2606         if (conn == NULL) {
2607                 kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2608                 kiblnd_peer_decref(peer); /* lose cmid's ref */
2609                 return -ENOMEM;
2610         }
2611
2612         /* conn "owns" cmid now, so I return success from here on to ensure the
2613          * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2614          * on peer */
2615
2616         msg = &conn->ibc_connvars->cv_msg;
2617
2618         memset(msg, 0, sizeof(*msg));
2619         kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2620         msg->ibm_u.connparams.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE;
2621         msg->ibm_u.connparams.ibcp_max_frags = IBLND_MAX_RDMA_FRAGS;
2622         msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2623         kiblnd_pack_msg(peer->ibp_ni, msg, 0, peer->ibp_nid, 0);
2624         
2625         memset(&cp, 0, sizeof(cp));
2626         cp.private_data        = msg;
2627         cp.private_data_len    = msg->ibm_nob;
2628         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2629         cp.initiator_depth     = 0;
2630         cp.flow_control        = 1;
2631         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2632         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2633
2634         LASSERT(cmid->context == (void *)conn);
2635         LASSERT(conn->ibc_cmid == cmid);
2636         
2637         rc = rdma_connect(cmid, &cp);
2638         if (rc != 0) {
2639                 CERROR("Can't connect to %s: %d\n",
2640                        libcfs_nid2str(peer->ibp_nid), rc);
2641                 kiblnd_connreq_done(conn, rc);
2642                 kiblnd_conn_decref(conn);
2643         }
2644
2645         return 0;
2646 }
2647
2648 int
2649 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2650 {
2651         kib_peer_t  *peer;
2652         kib_conn_t  *conn;
2653         int          rc;
2654
2655         switch (event->event) {
2656         default:
2657                 LBUG();
2658
2659         case RDMA_CM_EVENT_CONNECT_REQUEST:
2660                 /* destroy cmid on failure */
2661                 rc = kiblnd_passive_connect(cmid, 
2662                                             (void *)KIBLND_CONN_PARAM(event),
2663                                             KIBLND_CONN_PARAM_LEN(event));
2664                 CDEBUG(D_NET, "connreq: %d\n", rc);
2665                 return rc;
2666                 
2667         case RDMA_CM_EVENT_ADDR_ERROR:
2668                 peer = (kib_peer_t *)cmid->context;
2669                 CDEBUG(D_NETERROR, "%s: ADDR ERROR %d\n",
2670                        libcfs_nid2str(peer->ibp_nid), event->status);
2671                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2672                 kiblnd_peer_decref(peer);
2673                 return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2674
2675         case RDMA_CM_EVENT_ADDR_RESOLVED:
2676                 peer = (kib_peer_t *)cmid->context;
2677
2678                 CDEBUG(D_NET,"%s Addr resolved: %d\n",
2679                        libcfs_nid2str(peer->ibp_nid), event->status);
2680
2681                 if (event->status != 0) {
2682                         CDEBUG(D_NETERROR, "Can't resolve address for %s: %d\n",
2683                                libcfs_nid2str(peer->ibp_nid), event->status);
2684                         rc = event->status;
2685                 } else {
2686                         rc = rdma_resolve_route(
2687                                 cmid, *kiblnd_tunables.kib_timeout * 1000);
2688                         if (rc == 0)
2689                                 return 0;
2690                         /* Can't initiate route resolution */
2691                         CERROR("Can't resolve route for %s: %d\n",
2692                                libcfs_nid2str(peer->ibp_nid), rc);
2693                 }
2694                 kiblnd_peer_connect_failed(peer, 1, rc);
2695                 kiblnd_peer_decref(peer);
2696                 return rc;                      /* rc != 0 destroys cmid */
2697
2698         case RDMA_CM_EVENT_ROUTE_ERROR:
2699                 peer = (kib_peer_t *)cmid->context;
2700                 CDEBUG(D_NETERROR, "%s: ROUTE ERROR %d\n",
2701                        libcfs_nid2str(peer->ibp_nid), event->status);
2702                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2703                 kiblnd_peer_decref(peer);
2704                 return -EHOSTUNREACH;           /* rc != 0 destroys cmid */
2705
2706         case RDMA_CM_EVENT_ROUTE_RESOLVED:
2707                 peer = (kib_peer_t *)cmid->context;
2708                 CDEBUG(D_NET,"%s Route resolved: %d\n",
2709                        libcfs_nid2str(peer->ibp_nid), event->status);
2710
2711                 if (event->status == 0)
2712                         return kiblnd_active_connect(cmid);
2713
2714                 CDEBUG(D_NETERROR, "Can't resolve route for %s: %d\n",
2715                        libcfs_nid2str(peer->ibp_nid), event->status);
2716                 kiblnd_peer_connect_failed(peer, 1, event->status);
2717                 kiblnd_peer_decref(peer);
2718                 return event->status;           /* rc != 0 destroys cmid */
2719                 
2720         case RDMA_CM_EVENT_UNREACHABLE:
2721                 conn = (kib_conn_t *)cmid->context;
2722                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2723                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2724                 CDEBUG(D_NETERROR, "%s: UNREACHABLE %d\n",
2725                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2726                 kiblnd_connreq_done(conn, -ENETDOWN);
2727                 kiblnd_conn_decref(conn);
2728                 return 0;
2729
2730         case RDMA_CM_EVENT_CONNECT_ERROR:
2731                 conn = (kib_conn_t *)cmid->context;
2732                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2733                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2734                 CDEBUG(D_NETERROR, "%s: CONNECT ERROR %d\n",
2735                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2736                 kiblnd_connreq_done(conn, -ENOTCONN);
2737                 kiblnd_conn_decref(conn);
2738                 return 0;
2739
2740         case RDMA_CM_EVENT_REJECTED:
2741                 conn = (kib_conn_t *)cmid->context;
2742                 switch (conn->ibc_state) {
2743                 default:
2744                         LBUG();
2745
2746                 case IBLND_CONN_PASSIVE_WAIT:
2747                         CERROR ("%s: REJECTED %d\n",
2748                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
2749                                 event->status);
2750                         kiblnd_connreq_done(conn, -ECONNRESET);
2751                         break;
2752
2753                 case IBLND_CONN_ACTIVE_CONNECT:
2754                         kiblnd_rejected(conn, event->status,
2755                                         (void *)KIBLND_CONN_PARAM(event),
2756                                         KIBLND_CONN_PARAM_LEN(event));
2757                         break;
2758                 }
2759                 kiblnd_conn_decref(conn);
2760                 return 0;
2761
2762         case RDMA_CM_EVENT_ESTABLISHED:
2763                 conn = (kib_conn_t *)cmid->context;
2764                 switch (conn->ibc_state) {
2765                 default:
2766                         LBUG();
2767
2768                 case IBLND_CONN_PASSIVE_WAIT:
2769                         CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
2770                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2771                         kiblnd_connreq_done(conn, 0);
2772                         break;
2773
2774                 case IBLND_CONN_ACTIVE_CONNECT:
2775                         CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
2776                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2777                         kiblnd_check_connreply(conn,
2778                                                (void *)KIBLND_CONN_PARAM(event),
2779                                                KIBLND_CONN_PARAM_LEN(event));
2780                         break;
2781                 }
2782                 /* net keeps its ref on conn! */
2783                 return 0;
2784
2785         case RDMA_CM_EVENT_DISCONNECTED:
2786                 conn = (kib_conn_t *)cmid->context;
2787                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
2788                         CERROR("%s DISCONNECTED\n",
2789                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2790                         kiblnd_connreq_done(conn, -ECONNRESET);
2791                 } else {
2792                         kiblnd_close_conn(conn, 0);
2793                 }
2794                 kiblnd_conn_decref(conn);
2795                 return 0;
2796
2797         case RDMA_CM_EVENT_DEVICE_REMOVAL:
2798                 LCONSOLE_ERROR_MSG(0x131,
2799                                    "Received notification of device removal\n"
2800                                    "Please shutdown LNET to allow this to proceed\n");
2801                 /* Can't remove network from underneath LNET for now, so I have
2802                  * to ignore this */
2803                 return 0;
2804         }
2805 }
2806
2807 int
2808 kiblnd_check_txs (kib_conn_t *conn, struct list_head *txs)
2809 {
2810         kib_tx_t          *tx;
2811         struct list_head  *ttmp;
2812         int                timed_out = 0;
2813
2814         spin_lock(&conn->ibc_lock);
2815
2816         list_for_each (ttmp, txs) {
2817                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2818
2819                 if (txs != &conn->ibc_active_txs) {
2820                         LASSERT (tx->tx_queued);
2821                 } else {
2822                         LASSERT (!tx->tx_queued);
2823                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2824                 }
2825
2826                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2827                         timed_out = 1;
2828                         break;
2829                 }
2830         }
2831
2832         spin_unlock(&conn->ibc_lock);
2833         return timed_out;
2834 }
2835
2836 int
2837 kiblnd_conn_timed_out (kib_conn_t *conn)
2838 {
2839         return  kiblnd_check_txs(conn, &conn->ibc_tx_queue) ||
2840                 kiblnd_check_txs(conn, &conn->ibc_tx_noops) ||
2841                 kiblnd_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
2842                 kiblnd_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
2843                 kiblnd_check_txs(conn, &conn->ibc_active_txs);
2844 }
2845
2846 void
2847 kiblnd_check_conns (int idx)
2848 {
2849         struct list_head  *peers = &kiblnd_data.kib_peers[idx];
2850         struct list_head  *ptmp;
2851         kib_peer_t        *peer;
2852         kib_conn_t        *conn;
2853         struct list_head  *ctmp;
2854         unsigned long      flags;
2855
2856  again:
2857         /* NB. We expect to have a look at all the peers and not find any
2858          * rdmas to time out, so we just use a shared lock while we
2859          * take a look... */
2860         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2861
2862         list_for_each (ptmp, peers) {
2863                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2864
2865                 list_for_each (ctmp, &peer->ibp_conns) {
2866                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2867
2868                         LASSERT (conn->ibc_state == IBLND_CONN_ESTABLISHED);
2869
2870                         /* In case we have enough credits to return via a
2871                          * NOOP, but there were no non-blocking tx descs
2872                          * free to do it last time... */
2873                         kiblnd_check_sends(conn);
2874
2875                         if (!kiblnd_conn_timed_out(conn))
2876                                 continue;
2877
2878                         /* Handle timeout by closing the whole connection.  We
2879                          * can only be sure RDMA activity has ceased once the
2880                          * QP has been modified. */
2881
2882                         kiblnd_conn_addref(conn); /* 1 ref for me... */
2883
2884                         read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2885                                                flags);
2886
2887                         CERROR("Timed out RDMA with %s\n",
2888                                libcfs_nid2str(peer->ibp_nid));
2889
2890                         kiblnd_close_conn(conn, -ETIMEDOUT);
2891                         kiblnd_conn_decref(conn); /* ...until here */
2892
2893                         /* start again now I've dropped the lock */
2894                         goto again;
2895                 }
2896         }
2897
2898         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2899 }
2900
2901 void
2902 kiblnd_disconnect_conn (kib_conn_t *conn)
2903 {
2904         LASSERT (!in_interrupt());
2905         LASSERT (current == kiblnd_data.kib_connd);
2906         LASSERT (conn->ibc_state == IBLND_CONN_CLOSING);
2907
2908         rdma_disconnect(conn->ibc_cmid);
2909         kiblnd_finalise_conn(conn);
2910
2911         kiblnd_peer_notify(conn->ibc_peer);
2912 }
2913
2914 int
2915 kiblnd_connd (void *arg)
2916 {
2917         wait_queue_t       wait;
2918         unsigned long      flags;
2919         kib_conn_t        *conn;
2920         int                timeout;
2921         int                i;
2922         int                dropped_lock;
2923         int                peer_index = 0;
2924         unsigned long      deadline = jiffies;
2925
2926         cfs_daemonize ("kiblnd_connd");
2927         cfs_block_allsigs ();
2928
2929         init_waitqueue_entry (&wait, current);
2930         kiblnd_data.kib_connd = current;
2931
2932         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
2933
2934         while (!kiblnd_data.kib_shutdown) {
2935
2936                 dropped_lock = 0;
2937
2938                 if (!list_empty (&kiblnd_data.kib_connd_zombies)) {
2939                         conn = list_entry (kiblnd_data.kib_connd_zombies.next,
2940                                            kib_conn_t, ibc_list);
2941                         list_del (&conn->ibc_list);
2942
2943                         spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
2944                         dropped_lock = 1;
2945
2946                         kiblnd_destroy_conn(conn);
2947
2948                         spin_lock_irqsave (&kiblnd_data.kib_connd_lock, flags);
2949                 }
2950
2951                 if (!list_empty (&kiblnd_data.kib_connd_conns)) {
2952                         conn = list_entry (kiblnd_data.kib_connd_conns.next,
2953                                            kib_conn_t, ibc_list);
2954                         list_del (&conn->ibc_list);
2955
2956                         spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
2957                         dropped_lock = 1;
2958
2959                         kiblnd_disconnect_conn(conn);
2960                         kiblnd_conn_decref(conn);
2961
2962                         spin_lock_irqsave (&kiblnd_data.kib_connd_lock, flags);
2963                 }
2964
2965                 /* careful with the jiffy wrap... */
2966                 timeout = (int)(deadline - jiffies);
2967                 if (timeout <= 0) {
2968                         const int n = 4;
2969                         const int p = 1;
2970                         int       chunk = kiblnd_data.kib_peer_hash_size;
2971
2972                         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
2973                         dropped_lock = 1;
2974
2975                         /* Time to check for RDMA timeouts on a few more
2976                          * peers: I do checks every 'p' seconds on a
2977                          * proportion of the peer table and I need to check
2978                          * every connection 'n' times within a timeout
2979                          * interval, to ensure I detect a timeout on any
2980                          * connection within (n+1)/n times the timeout
2981                          * interval. */
2982
2983                         if (*kiblnd_tunables.kib_timeout > n * p)
2984                                 chunk = (chunk * n * p) /
2985                                         *kiblnd_tunables.kib_timeout;
2986                         if (chunk == 0)
2987                                 chunk = 1;
2988
2989                         for (i = 0; i < chunk; i++) {
2990                                 kiblnd_check_conns(peer_index);
2991                                 peer_index = (peer_index + 1) %
2992                                              kiblnd_data.kib_peer_hash_size;
2993                         }
2994
2995                         deadline += p * HZ;
2996                         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
2997                 }
2998
2999                 if (dropped_lock)
3000                         continue;
3001
3002                 /* Nothing to do for 'timeout'  */
3003                 set_current_state (TASK_INTERRUPTIBLE);
3004                 add_wait_queue (&kiblnd_data.kib_connd_waitq, &wait);
3005                 spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
3006
3007                 schedule_timeout (timeout);
3008
3009                 set_current_state (TASK_RUNNING);
3010                 remove_wait_queue (&kiblnd_data.kib_connd_waitq, &wait);
3011                 spin_lock_irqsave (&kiblnd_data.kib_connd_lock, flags);
3012         }
3013
3014         spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
3015
3016         kiblnd_thread_fini();
3017         return (0);
3018 }
3019
3020 void
3021 kiblnd_qp_event(struct ib_event *event, void *arg)
3022 {
3023         kib_conn_t *conn = arg;
3024
3025         switch (event->event) {
3026         case IB_EVENT_COMM_EST:
3027                 CDEBUG(D_NET, "%s established\n",
3028                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3029                 return;
3030                 
3031         default:
3032                 CERROR("%s: Async QP event type %d\n",
3033                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3034                 return;
3035         }
3036 }
3037
3038 void
3039 kiblnd_complete (struct ib_wc *wc)
3040 {
3041         switch (kiblnd_wreqid2type(wc->wr_id)) {
3042         default:
3043                 LBUG();
3044
3045         case IBLND_WID_RDMA:
3046                 /* We only get RDMA completion notification if it fails.  All
3047                  * subsequent work items, including the final SEND will fail
3048                  * too.  However we can't print out any more info about the
3049                  * failing RDMA because 'tx' might be back on the idle list or
3050                  * even reused already if we didn't manage to post all our work
3051                  * items */
3052                 CDEBUG(D_NETERROR, "RDMA (tx: %p) failed: %d\n",
3053                        kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3054                 return;
3055
3056         case IBLND_WID_TX:
3057                 kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3058                 return;
3059
3060         case IBLND_WID_RX:
3061                 kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3062                                    wc->byte_len);
3063                 return;
3064         }
3065 }
3066
3067 void
3068 kiblnd_cq_completion (struct ib_cq *cq, void *arg)
3069 {
3070         /* NB I'm not allowed to schedule this conn once its refcount has
3071          * reached 0.  Since fundamentally I'm racing with scheduler threads
3072          * consuming my CQ I could be called after all completions have
3073          * occurred.  But in this case, ibc_nrx == 0 && ibc_nsends_posted == 0
3074          * and this CQ is about to be destroyed so I NOOP. */
3075         kib_conn_t     *conn = (kib_conn_t *)arg;
3076         unsigned long   flags;
3077         
3078         LASSERT (cq == conn->ibc_cq);
3079
3080         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3081
3082         conn->ibc_ready = 1;
3083
3084         if (!conn->ibc_scheduled &&
3085             (conn->ibc_nrx > 0 ||
3086              conn->ibc_nsends_posted > 0)) {
3087                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3088                 conn->ibc_scheduled = 1;
3089                 list_add_tail(&conn->ibc_sched_list,
3090                               &kiblnd_data.kib_sched_conns);
3091                 wake_up(&kiblnd_data.kib_sched_waitq);
3092         }
3093
3094         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3095 }
3096
3097 void
3098 kiblnd_cq_event(struct ib_event *event, void *arg)
3099 {
3100         kib_conn_t *conn = arg;
3101
3102         CERROR("%s: async CQ event type %d\n",
3103                libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3104 }
3105
3106 int
3107 kiblnd_scheduler(void *arg)
3108 {
3109         long            id = (long)arg;
3110         wait_queue_t    wait;
3111         char            name[16];
3112         unsigned long   flags;
3113         kib_conn_t     *conn;
3114         struct ib_wc    wc;
3115         int             rc;
3116         int             did_something;
3117         int             busy_loops = 0;
3118
3119         snprintf(name, sizeof(name), "kiblnd_sd_%02ld", id);
3120         cfs_daemonize(name);
3121         cfs_block_allsigs();
3122
3123         init_waitqueue_entry(&wait, current);
3124
3125         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3126
3127         while (!kiblnd_data.kib_shutdown) {
3128                 if (busy_loops++ >= IBLND_RESCHED) {
3129                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3130                                                flags);
3131
3132                         our_cond_resched();
3133                         busy_loops = 0;
3134
3135                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3136                 }
3137
3138                 did_something = 0;
3139
3140                 if (!list_empty(&kiblnd_data.kib_sched_conns)) {
3141                         conn = list_entry(kiblnd_data.kib_sched_conns.next,
3142                                           kib_conn_t, ibc_sched_list);
3143                         /* take over kib_sched_conns' ref on conn... */
3144                         LASSERT(conn->ibc_scheduled);
3145                         list_del(&conn->ibc_sched_list);
3146                         conn->ibc_ready = 0;
3147                         
3148                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3149                                                flags);
3150
3151                         rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3152                         if (rc == 0) {
3153                                 rc = ib_req_notify_cq(conn->ibc_cq,
3154                                                       IB_CQ_NEXT_COMP);
3155                                 if (rc < 0) {
3156                                         CWARN("%s: ib_req_notify_cq failed: %d, "
3157                                               "closing connection\n",
3158                                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3159                                         kiblnd_close_conn(conn, -EIO);
3160                                         kiblnd_conn_decref(conn);
3161                                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3162                                         continue;
3163                                 }
3164
3165                                 rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3166                         }
3167
3168                         if (rc < 0) {
3169                                 CWARN("%s: ib_poll_cq failed: %d, "
3170                                       "closing connection\n",
3171                                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3172                                 kiblnd_close_conn(conn, -EIO);
3173                                 kiblnd_conn_decref(conn);
3174                                 spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3175                                 continue;
3176                         }
3177
3178                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock,
3179                                           flags);
3180
3181                         if (rc != 0 || conn->ibc_ready) {
3182                                 /* There may be another completion waiting; get
3183                                  * another scheduler to check while I handle
3184                                  * this one... */
3185                                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3186                                 list_add_tail(&conn->ibc_sched_list,
3187                                               &kiblnd_data.kib_sched_conns);
3188                                 wake_up(&kiblnd_data.kib_sched_waitq);
3189                         } else {
3190                                 conn->ibc_scheduled = 0;
3191                         }
3192                         
3193                         if (rc != 0) {
3194                                 spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3195                                                        flags);
3196
3197                                 kiblnd_complete(&wc);
3198
3199                                 spin_lock_irqsave(&kiblnd_data.kib_sched_lock,
3200                                                   flags);
3201                         }
3202
3203                         kiblnd_conn_decref(conn); /* ...drop my ref from above */
3204                         did_something = 1;
3205                 }
3206
3207                 if (did_something)
3208                         continue;
3209
3210                 set_current_state(TASK_INTERRUPTIBLE);
3211                 add_wait_queue_exclusive(&kiblnd_data.kib_sched_waitq, &wait);
3212                 spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3213
3214                 schedule();
3215                 busy_loops = 0;
3216
3217                 remove_wait_queue(&kiblnd_data.kib_sched_waitq, &wait);
3218                 set_current_state(TASK_RUNNING);
3219                 spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3220         }
3221
3222         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3223
3224         kiblnd_thread_fini();
3225         return (0);
3226 }