Whamcloud - gitweb
b=16186,i=liangzhen,i=maxim:
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "o2iblnd.h"
42
43 char *
44 kiblnd_msgtype2str(int type) 
45 {
46         switch (type) {
47         case IBLND_MSG_CONNREQ:
48                 return "CONNREQ";
49                 
50         case IBLND_MSG_CONNACK:
51                 return "CONNACK";
52                 
53         case IBLND_MSG_NOOP:
54                 return "NOOP";
55                 
56         case IBLND_MSG_IMMEDIATE:
57                 return "IMMEDIATE";
58                 
59         case IBLND_MSG_PUT_REQ:
60                 return "PUT_REQ";
61                 
62         case IBLND_MSG_PUT_NAK:
63                 return "PUT_NAK";
64                 
65         case IBLND_MSG_PUT_ACK:
66                 return "PUT_ACK";
67                 
68         case IBLND_MSG_PUT_DONE:
69                 return "PUT_DONE";
70                 
71         case IBLND_MSG_GET_REQ:
72                 return "GET_REQ";
73                 
74         case IBLND_MSG_GET_DONE:
75                 return "GET_DONE";
76                 
77         default:
78                 return "???";
79         }
80 }
81
82 void
83 kiblnd_tx_done (lnet_ni_t *ni, kib_tx_t *tx)
84 {
85         lnet_msg_t *lntmsg[2];
86         kib_net_t  *net = ni->ni_data;
87         int         rc;
88         int         i;
89
90         LASSERT (net != NULL);
91         LASSERT (!in_interrupt());
92         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
93         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
94         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
95
96 #if IBLND_MAP_ON_DEMAND
97         if (tx->tx_fmr != NULL) {
98                 rc = ib_fmr_pool_unmap(tx->tx_fmr);
99                 LASSERT (rc == 0);
100
101                 if (tx->tx_status != 0) {
102                         rc = ib_flush_fmr_pool(net->ibn_fmrpool);
103                         LASSERT (rc == 0);
104                 }
105
106                 tx->tx_fmr = NULL;
107         }
108 #else
109         if (tx->tx_nfrags != 0) {
110                 kiblnd_dma_unmap_sg(net->ibn_dev->ibd_cmid->device,
111                                     tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
112                 tx->tx_nfrags = 0;
113         }
114 #endif
115         /* tx may have up to 2 lnet msgs to finalise */
116         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
117         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
118         rc = tx->tx_status;
119
120         if (tx->tx_conn != NULL) {
121                 LASSERT (ni == tx->tx_conn->ibc_peer->ibp_ni);
122
123                 kiblnd_conn_decref(tx->tx_conn);
124                 tx->tx_conn = NULL;
125         }
126
127         tx->tx_nwrq = 0;
128         tx->tx_status = 0;
129
130         spin_lock(&net->ibn_tx_lock);
131
132         list_add(&tx->tx_list, &net->ibn_idle_txs);
133
134         spin_unlock(&net->ibn_tx_lock);
135
136         /* delay finalize until my descs have been freed */
137         for (i = 0; i < 2; i++) {
138                 if (lntmsg[i] == NULL)
139                         continue;
140
141                 lnet_finalize(ni, lntmsg[i], rc);
142         }
143 }
144
145 void
146 kiblnd_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int status)
147 {
148         kib_tx_t *tx;
149         
150         while (!list_empty (txlist)) {
151                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
152
153                 list_del (&tx->tx_list);
154                 /* complete now */
155                 tx->tx_waiting = 0;
156                 tx->tx_status = status;
157                 kiblnd_tx_done(ni, tx);
158         }
159 }
160
161 kib_tx_t *
162 kiblnd_get_idle_tx (lnet_ni_t *ni)
163 {
164         kib_net_t     *net = ni->ni_data;
165         kib_tx_t      *tx;
166
167         LASSERT (net != NULL);
168
169         spin_lock(&net->ibn_tx_lock);
170
171         if (list_empty(&net->ibn_idle_txs)) {
172                 spin_unlock(&net->ibn_tx_lock);
173                 return NULL;
174         }
175
176         tx = list_entry(net->ibn_idle_txs.next, kib_tx_t, tx_list);
177         list_del(&tx->tx_list);
178
179         /* Allocate a new completion cookie.  It might not be needed,
180          * but we've got a lock right now and we're unlikely to
181          * wrap... */
182         tx->tx_cookie = kiblnd_data.kib_next_tx_cookie++;
183
184         spin_unlock(&net->ibn_tx_lock);
185
186         LASSERT (tx->tx_nwrq == 0);
187         LASSERT (!tx->tx_queued);
188         LASSERT (tx->tx_sending == 0);
189         LASSERT (!tx->tx_waiting);
190         LASSERT (tx->tx_status == 0);
191         LASSERT (tx->tx_conn == NULL);
192         LASSERT (tx->tx_lntmsg[0] == NULL);
193         LASSERT (tx->tx_lntmsg[1] == NULL);
194 #if IBLND_MAP_ON_DEMAND
195         LASSERT (tx->tx_fmr == NULL);
196 #else
197         LASSERT (tx->tx_nfrags == 0);
198 #endif
199
200         return tx;
201 }
202
203 void
204 kiblnd_drop_rx (kib_rx_t *rx)
205 {
206         kib_conn_t         *conn = rx->rx_conn;
207         unsigned long       flags;
208         
209         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
210         LASSERT (conn->ibc_nrx > 0);
211         conn->ibc_nrx--;
212         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
213
214         kiblnd_conn_decref(conn);
215 }
216
217 int
218 kiblnd_post_rx (kib_rx_t *rx, int credit)
219 {
220         kib_conn_t         *conn = rx->rx_conn;
221         kib_net_t          *net = conn->ibc_peer->ibp_ni->ni_data;
222         struct ib_recv_wr  *bad_wrq;
223         int                 rc;
224
225         LASSERT (net != NULL);
226         LASSERT (!in_interrupt());
227         LASSERT (credit == IBLND_POSTRX_NO_CREDIT ||
228                  credit == IBLND_POSTRX_PEER_CREDIT ||
229                  credit == IBLND_POSTRX_RSRVD_CREDIT);
230
231         rx->rx_sge.length = IBLND_MSG_SIZE;
232         rx->rx_sge.lkey = net->ibn_dev->ibd_mr->lkey;
233         rx->rx_sge.addr = rx->rx_msgaddr;
234
235         rx->rx_wrq.next = NULL;
236         rx->rx_wrq.sg_list = &rx->rx_sge;
237         rx->rx_wrq.num_sge = 1;
238         rx->rx_wrq.wr_id = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
239
240         LASSERT (conn->ibc_state >= IBLND_CONN_INIT);
241         LASSERT (rx->rx_nob >= 0);              /* not posted */
242
243         if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
244                 kiblnd_drop_rx(rx);             /* No more posts for this rx */
245                 return 0;
246         }
247
248         rx->rx_nob = -1;                        /* flag posted */
249
250         rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
251
252         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
253                 return rc;
254
255         if (rc != 0) {
256                 CERROR("Can't post rx for %s: %d\n",
257                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
258                 kiblnd_close_conn(conn, rc);
259                 kiblnd_drop_rx(rx);             /* No more posts for this rx */
260                 return rc;
261         }
262
263         if (credit == IBLND_POSTRX_NO_CREDIT)
264                 return 0;
265
266         spin_lock(&conn->ibc_lock);
267         if (credit == IBLND_POSTRX_PEER_CREDIT)
268                 conn->ibc_outstanding_credits++;
269         else
270                 conn->ibc_reserved_credits++;
271         spin_unlock(&conn->ibc_lock);
272
273         kiblnd_check_sends(conn);
274         return 0;
275 }
276
277 kib_tx_t *
278 kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
279 {
280         struct list_head   *tmp;
281
282         list_for_each(tmp, &conn->ibc_active_txs) {
283                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
284
285                 LASSERT (!tx->tx_queued);
286                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
287
288                 if (tx->tx_cookie != cookie)
289                         continue;
290
291                 if (tx->tx_waiting &&
292                     tx->tx_msg->ibm_type == txtype)
293                         return tx;
294
295                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
296                       tx->tx_waiting ? "" : "NOT ",
297                       tx->tx_msg->ibm_type, txtype);
298         }
299         return NULL;
300 }
301
302 void
303 kiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
304 {
305         kib_tx_t    *tx;
306         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
307         int          idle;
308
309         spin_lock(&conn->ibc_lock);
310
311         tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
312         if (tx == NULL) {
313                 spin_unlock(&conn->ibc_lock);
314
315                 CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
316                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
317                 kiblnd_close_conn(conn, -EPROTO);
318                 return;
319         }
320
321         if (tx->tx_status == 0) {               /* success so far */
322                 if (status < 0) {               /* failed? */
323                         tx->tx_status = status;
324                 } else if (txtype == IBLND_MSG_GET_REQ) {
325                         lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
326                 }
327         }
328
329         tx->tx_waiting = 0;
330
331         idle = !tx->tx_queued && (tx->tx_sending == 0);
332         if (idle)
333                 list_del(&tx->tx_list);
334
335         spin_unlock(&conn->ibc_lock);
336
337         if (idle)
338                 kiblnd_tx_done(ni, tx);
339 }
340
341 void
342 kiblnd_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie)
343 {
344         lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;
345         kib_tx_t    *tx = kiblnd_get_idle_tx(ni);
346
347         if (tx == NULL) {
348                 CERROR("Can't get tx for completion %x for %s\n",
349                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
350                 return;
351         }
352
353         tx->tx_msg->ibm_u.completion.ibcm_status = status;
354         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
355         kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));
356
357         kiblnd_queue_tx(tx, conn);
358 }
359
360 void
361 kiblnd_handle_rx (kib_rx_t *rx)
362 {
363         kib_msg_t    *msg = rx->rx_msg;
364         kib_conn_t   *conn = rx->rx_conn;
365         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
366         int           credits = msg->ibm_credits;
367         kib_tx_t     *tx;
368         int           rc = 0;
369         int           rc2;
370         int           post_credit;
371
372         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
373
374         CDEBUG (D_NET, "Received %x[%d] from %s\n",
375                 msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));
376
377         if (credits != 0) {
378                 /* Have I received credits that will let me send? */
379                 spin_lock(&conn->ibc_lock);
380
381                 if (conn->ibc_credits + credits > IBLND_MSG_QUEUE_SIZE) {
382                         rc2 = conn->ibc_credits;
383                         spin_unlock(&conn->ibc_lock);
384
385                         CERROR("Bad credits from %s: %d + %d > %d\n",
386                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
387                                rc2, credits, IBLND_MSG_QUEUE_SIZE);
388
389                         kiblnd_close_conn(conn, -EPROTO);
390                         kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
391                         return;
392                 }
393
394                 conn->ibc_credits += credits;
395
396                 /* This ensures the credit taken by NOOP can be returned */
397                 if (msg->ibm_type == IBLND_MSG_NOOP)
398                         conn->ibc_outstanding_credits++;
399
400                 spin_unlock(&conn->ibc_lock);
401                 kiblnd_check_sends(conn);
402         }
403
404         switch (msg->ibm_type) {
405         default:
406                 CERROR("Bad IBLND message type %x from %s\n",
407                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
408                 post_credit = IBLND_POSTRX_NO_CREDIT;
409                 rc = -EPROTO;
410                 break;
411
412         case IBLND_MSG_NOOP:
413                 if (credits != 0) /* credit already posted */
414                         post_credit = IBLND_POSTRX_NO_CREDIT;
415                 else              /* a keepalive NOOP */
416                         post_credit = IBLND_POSTRX_PEER_CREDIT;
417                 break;
418
419         case IBLND_MSG_IMMEDIATE:
420                 post_credit = IBLND_POSTRX_DONT_POST;
421                 rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
422                                 msg->ibm_srcnid, rx, 0);
423                 if (rc < 0)                     /* repost on error */
424                         post_credit = IBLND_POSTRX_PEER_CREDIT;
425                 break;
426
427         case IBLND_MSG_PUT_REQ:
428                 post_credit = IBLND_POSTRX_DONT_POST;
429                 rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
430                                 msg->ibm_srcnid, rx, 1);
431                 if (rc < 0)                     /* repost on error */
432                         post_credit = IBLND_POSTRX_PEER_CREDIT;
433                 break;
434
435         case IBLND_MSG_PUT_NAK:
436                 CWARN ("PUT_NACK from %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid));
437                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
438                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
439                                          msg->ibm_u.completion.ibcm_status,
440                                          msg->ibm_u.completion.ibcm_cookie);
441                 break;
442
443         case IBLND_MSG_PUT_ACK:
444                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
445
446                 spin_lock(&conn->ibc_lock);
447                 tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
448                                                    msg->ibm_u.putack.ibpam_src_cookie);
449                 if (tx != NULL)
450                         list_del(&tx->tx_list);
451                 spin_unlock(&conn->ibc_lock);
452
453                 if (tx == NULL) {
454                         CERROR("Unmatched PUT_ACK from %s\n",
455                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
456                         rc = -EPROTO;
457                         break;
458                 }
459
460                 LASSERT (tx->tx_waiting);
461                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
462                  * (a) I can overwrite tx_msg since my peer has received it!
463                  * (b) tx_waiting set tells tx_complete() it's not done. */
464
465                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
466
467                 rc2 = kiblnd_init_rdma(ni, tx, IBLND_MSG_PUT_DONE,
468                                        kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
469                                        &msg->ibm_u.putack.ibpam_rd,
470                                        msg->ibm_u.putack.ibpam_dst_cookie);
471                 if (rc2 < 0)
472                         CERROR("Can't setup rdma for PUT to %s: %d\n",
473                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
474
475                 spin_lock(&conn->ibc_lock);
476                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
477                 kiblnd_queue_tx_locked(tx, conn);
478                 spin_unlock(&conn->ibc_lock);
479                 break;
480
481         case IBLND_MSG_PUT_DONE:
482                 post_credit = IBLND_POSTRX_PEER_CREDIT;
483                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
484                                          msg->ibm_u.completion.ibcm_status,
485                                          msg->ibm_u.completion.ibcm_cookie);
486                 break;
487
488         case IBLND_MSG_GET_REQ:
489                 post_credit = IBLND_POSTRX_DONT_POST;
490                 rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
491                                 msg->ibm_srcnid, rx, 1);
492                 if (rc < 0)                     /* repost on error */
493                         post_credit = IBLND_POSTRX_PEER_CREDIT;
494                 break;
495
496         case IBLND_MSG_GET_DONE:
497                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
498                 kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
499                                          msg->ibm_u.completion.ibcm_status,
500                                          msg->ibm_u.completion.ibcm_cookie);
501                 break;
502         }
503
504         if (rc < 0)                             /* protocol error */
505                 kiblnd_close_conn(conn, rc);
506
507         if (post_credit != IBLND_POSTRX_DONT_POST)
508                 kiblnd_post_rx(rx, post_credit);
509 }
510
511 void
512 kiblnd_rx_complete (kib_rx_t *rx, int status, int nob)
513 {
514         kib_msg_t    *msg = rx->rx_msg;
515         kib_conn_t   *conn = rx->rx_conn;
516         lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;
517         kib_net_t    *net = ni->ni_data;
518         unsigned long flags;
519         int           rc;
520         int           err = -EIO;
521
522         LASSERT (net != NULL);
523         LASSERT (rx->rx_nob < 0);               /* was posted */
524         rx->rx_nob = 0;                         /* isn't now */
525
526         if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
527                 goto ignore;
528
529         if (status != IB_WC_SUCCESS) {
530                 CDEBUG(D_NETERROR, "Rx from %s failed: %d\n",
531                        libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
532                 goto failed;
533         }
534
535         LASSERT (nob >= 0);
536         rx->rx_nob = nob;
537
538         rc = kiblnd_unpack_msg(msg, rx->rx_nob);
539         if (rc != 0) {
540                 CERROR ("Error %d unpacking rx from %s\n",
541                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
542                 goto failed;
543         }
544
545         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
546             msg->ibm_dstnid != ni->ni_nid ||
547             msg->ibm_srcstamp != conn->ibc_incarnation ||
548             msg->ibm_dststamp != net->ibn_incarnation) {
549                 CERROR ("Stale rx from %s\n",
550                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
551                 err = -ESTALE;
552                 goto failed;
553         }
554
555         /* set time last known alive */
556         kiblnd_peer_alive(conn->ibc_peer);
557
558         /* racing with connection establishment/teardown! */
559
560         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
561                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
562                 /* must check holding global lock to eliminate race */
563                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
564                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
565                         write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
566                                                 flags);
567                         return;
568                 }
569                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
570                                         flags);
571         }
572         kiblnd_handle_rx(rx);
573         return;
574
575  failed:
576         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
577         kiblnd_close_conn(conn, err);
578  ignore:
579         kiblnd_drop_rx(rx);                     /* Don't re-post rx. */
580 }
581
582 struct page *
583 kiblnd_kvaddr_to_page (unsigned long vaddr)
584 {
585         struct page *page;
586
587         if (vaddr >= VMALLOC_START &&
588             vaddr < VMALLOC_END) {
589                 page = vmalloc_to_page ((void *)vaddr);
590                 LASSERT (page != NULL);
591                 return page;
592         }
593 #ifdef CONFIG_HIGHMEM
594         if (vaddr >= PKMAP_BASE &&
595             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
596                 /* No highmem pages only used for bulk (kiov) I/O */
597                 CERROR("find page for address in highmem\n");
598                 LBUG();
599         }
600 #endif
601         page = virt_to_page (vaddr);
602         LASSERT (page != NULL);
603         return page;
604 }
605
606 #if !IBLND_MAP_ON_DEMAND
607 int
608 kiblnd_setup_rd_iov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd, 
609                     unsigned int niov, struct iovec *iov, int offset, int nob)
610                  
611 {
612         struct scatterlist *sg;
613         int                 i;
614         int                 fragnob;
615         unsigned long       vaddr;
616         struct page        *page;
617         int                 page_offset;
618         kib_net_t          *net = ni->ni_data;
619
620         LASSERT (nob > 0);
621         LASSERT (niov > 0);
622         LASSERT (net != NULL);
623
624         while (offset >= iov->iov_len) {
625                 offset -= iov->iov_len;
626                 niov--;
627                 iov++;
628                 LASSERT (niov > 0);
629         }
630
631         sg = tx->tx_frags;
632         do {
633                 LASSERT (niov > 0);
634
635                 vaddr = ((unsigned long)iov->iov_base) + offset;
636                 page_offset = vaddr & (PAGE_SIZE - 1);
637                 page = kiblnd_kvaddr_to_page(vaddr);
638                 if (page == NULL) {
639                         CERROR ("Can't find page\n");
640                         return -EFAULT;
641                 }
642
643                 fragnob = min((int)(iov->iov_len - offset), nob);
644                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
645
646                 sg_set_page(sg, page, fragnob, page_offset);
647                 sg++;
648
649                 if (offset + fragnob < iov->iov_len) {
650                         offset += fragnob;
651                 } else {
652                         offset = 0;
653                         iov++;
654                         niov--;
655                 }
656                 nob -= fragnob;
657         } while (nob > 0);
658         
659         /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
660          * RDMA sink */
661         tx->tx_nfrags = sg - tx->tx_frags;
662         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
663
664         rd->rd_nfrags = kiblnd_dma_map_sg(net->ibn_dev->ibd_cmid->device,
665                                           tx->tx_frags, tx->tx_nfrags,
666                                           tx->tx_dmadir);
667         rd->rd_key    = (rd != tx->tx_rd) ? 
668                         net->ibn_dev->ibd_mr->rkey : net->ibn_dev->ibd_mr->lkey;
669
670         for (i = 0; i < rd->rd_nfrags; i++) {
671                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
672                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
673                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
674                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
675         }
676         
677         return 0;
678 }
679
680 int
681 kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd, 
682                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
683 {
684         struct scatterlist *sg;
685         int                 i;
686         int                 fragnob;
687         kib_net_t          *net = ni->ni_data;
688
689         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
690
691         LASSERT (nob > 0);
692         LASSERT (nkiov > 0);
693         LASSERT (net != NULL);
694
695         while (offset >= kiov->kiov_len) {
696                 offset -= kiov->kiov_len;
697                 nkiov--;
698                 kiov++;
699                 LASSERT (nkiov > 0);
700         }
701
702         sg = tx->tx_frags;
703         do {
704                 LASSERT (nkiov > 0);
705
706                 fragnob = min((int)(kiov->kiov_len - offset), nob);
707
708                 memset(sg, 0, sizeof(*sg));
709                 sg_set_page(sg, kiov->kiov_page, fragnob,
710                             kiov->kiov_offset + offset);
711                 sg++;
712  
713                 offset = 0;
714                 kiov++;
715                 nkiov--;
716                 nob -= fragnob;
717         } while (nob > 0);
718
719         /* If rd is not tx_rd, it's going to get sent to a peer and I'm the
720          * RDMA sink */
721         tx->tx_nfrags = sg - tx->tx_frags;
722         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
723
724         rd->rd_nfrags = kiblnd_dma_map_sg(net->ibn_dev->ibd_cmid->device,
725                                           tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
726         rd->rd_key    = (rd != tx->tx_rd) ? 
727                         net->ibn_dev->ibd_mr->rkey : net->ibn_dev->ibd_mr->lkey;
728
729         for (i = 0; i < tx->tx_nfrags; i++) {
730                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
731                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
732                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
733                         net->ibn_dev->ibd_cmid->device, &tx->tx_frags[i]);
734 #if 0
735                 CDEBUG(D_WARNING,"frag[%d]: "LPX64" for %d\n",
736                        i, rd->rd_frags[i].rf_addr, rd->rd_frags[i].rf_nob);
737 #endif
738         }
739         
740         return 0;
741 }
742 #else
743 int
744 kiblnd_map_tx (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
745                int npages, unsigned long page_offset, int nob)
746 {
747         struct ib_pool_fmr *fmr;
748         kib_net_t          *net = ni->ni_data;
749
750         LASSERT (net != NULL);
751         LASSERT (tx->tx_fmr == NULL);
752         LASSERT (page_offset < PAGE_SIZE);
753         LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
754         LASSERT (npages <= LNET_MAX_IOV);
755
756         rd->rd_addr = 0;
757
758         fmr = ib_fmr_pool_map_phys(net->ibn_fmrpool, tx->tx_pages,
759                                    npages, rd->rd_addr);
760         if (IS_ERR(fmr)) {
761                 CERROR ("Can't map %d pages: %ld\n", npages, PTR_ERR(fmr));
762                 return PTR_ERR(fmr);
763         }
764
765         /* If rd is not tx_rd, it's going to get sent to a peer, who will need
766          * the rkey */
767
768         rd->rd_key = (rd != tx->tx_rd) ? fmr->fmr->rkey : fmr->fmr->lkey;
769         rd->rd_nob = nob;
770
771         tx->tx_fmr = fmr;
772         return 0;
773 }
774
775 int
776 kiblnd_setup_rd_iov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
777                      unsigned int niov, struct iovec *iov, int offset, int nob)
778
779 {
780         int           resid;
781         int           fragnob;
782         struct page  *page;
783         int           npages;
784         unsigned long page_offset;
785         unsigned long vaddr;
786
787         LASSERT (nob > 0);
788         LASSERT (niov > 0);
789
790         while (offset >= iov->iov_len) {
791                 offset -= iov->iov_len;
792                 niov--;
793                 iov++;
794                 LASSERT (niov > 0);
795         }
796
797         if (nob > iov->iov_len - offset) {
798                 CERROR ("Can't map multiple vaddr fragments\n");
799                 return (-EMSGSIZE);
800         }
801
802         vaddr = ((unsigned long)iov->iov_base) + offset;
803
804         page_offset = vaddr & (PAGE_SIZE - 1);
805         resid = nob;
806         npages = 0;
807
808         do {
809                 LASSERT (npages < LNET_MAX_IOV);
810
811                 page = kiblnd_kvaddr_to_page(vaddr);
812                 if (page == NULL) {
813                         CERROR("Can't find page for %lu\n", vaddr);
814                         return -EFAULT;
815                 }
816
817                 tx->tx_pages[npages++] = lnet_page2phys(page);
818
819                 fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
820                 vaddr += fragnob;
821                 resid -= fragnob;
822
823         } while (resid > 0);
824
825         return kiblnd_map_tx(ni, tx, rd, npages, page_offset, nob);
826 }
827
828 int
829 kiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
830                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
831 {
832         int            resid;
833         int            npages;
834         unsigned long  page_offset;
835
836         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
837
838         LASSERT (nob > 0);
839         LASSERT (nkiov > 0);
840         LASSERT (nkiov <= LNET_MAX_IOV);
841
842         while (offset >= kiov->kiov_len) {
843                 offset -= kiov->kiov_len;
844                 nkiov--;
845                 kiov++;
846                 LASSERT (nkiov > 0);
847         }
848
849         page_offset = kiov->kiov_offset + offset;
850
851         resid = offset + nob;
852         npages = 0;
853
854         do {
855                 LASSERT (npages < LNET_MAX_IOV);
856                 LASSERT (nkiov > 0);
857
858                 if ((npages > 0 && kiov->kiov_offset != 0) ||
859                     (resid > kiov->kiov_len &&
860                      (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
861                         /* Can't have gaps */
862                         CERROR ("Can't make payload contiguous in I/O VM:"
863                                 "page %d, offset %d, len %d \n",
864                                 npages, kiov->kiov_offset, kiov->kiov_len);
865
866                         return -EINVAL;
867                 }
868
869                 tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);
870                 resid -= kiov->kiov_len;
871                 kiov++;
872                 nkiov--;
873         } while (resid > 0);
874
875         return kiblnd_map_tx(ni, tx, rd, npages, page_offset, nob);
876 }
877 #endif
878
879 void
880 kiblnd_check_sends (kib_conn_t *conn)
881 {
882         kib_tx_t          *tx;
883         lnet_ni_t         *ni = conn->ibc_peer->ibp_ni;
884         int                rc;
885         int                consume_cred = 0;
886         struct ib_send_wr *bad_wrq;
887         int                done;
888
889         /* Don't send anything until after the connection is established */
890         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
891                 CDEBUG(D_NET, "%s too soon\n",
892                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
893                 return;
894         }
895
896         spin_lock(&conn->ibc_lock);
897
898         LASSERT (conn->ibc_nsends_posted <=
899                  *kiblnd_tunables.kib_concurrent_sends);
900         LASSERT (conn->ibc_reserved_credits >= 0);
901
902         while (conn->ibc_reserved_credits > 0 &&
903                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
904                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
905                                 kib_tx_t, tx_list);
906                 list_del(&tx->tx_list);
907                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
908                 conn->ibc_reserved_credits--;
909         }
910
911         if (kiblnd_send_noop(conn)) {
912                 spin_unlock(&conn->ibc_lock);
913
914                 tx = kiblnd_get_idle_tx(ni);
915                 if (tx != NULL)
916                         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
917
918                 spin_lock(&conn->ibc_lock);
919
920                 if (tx != NULL)
921                         kiblnd_queue_tx_locked(tx, conn);
922         }
923
924         for (;;) {
925                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
926                         tx = list_entry(conn->ibc_tx_queue_nocred.next, 
927                                         kib_tx_t, tx_list);
928                         consume_cred = 0;
929                 } else if (!list_empty(&conn->ibc_tx_noops)) {
930                         tx = list_entry(conn->ibc_tx_noops.next,
931                                         kib_tx_t, tx_list);
932                         consume_cred = 1;
933                 } else if (!list_empty(&conn->ibc_tx_queue)) {
934                         tx = list_entry(conn->ibc_tx_queue.next,
935                                         kib_tx_t, tx_list);
936                         consume_cred = 1;
937                 } else {
938                         /* nothing to send right now */
939                         break;
940                 }
941                 
942                 LASSERT (tx->tx_queued);
943                 /* We rely on this for QP sizing */
944                 LASSERT (tx->tx_nwrq > 0 &&
945                          tx->tx_nwrq <= 1 + IBLND_MAX_RDMA_FRAGS);
946
947                 LASSERT (conn->ibc_outstanding_credits >= 0);
948                 LASSERT (conn->ibc_outstanding_credits <= IBLND_MSG_QUEUE_SIZE);
949                 LASSERT (conn->ibc_credits >= 0);
950                 LASSERT (conn->ibc_credits <= IBLND_MSG_QUEUE_SIZE);
951
952                 if (conn->ibc_nsends_posted == 
953                     *kiblnd_tunables.kib_concurrent_sends) {
954                         /* tx completions outstanding... */
955                         CDEBUG(D_NET, "%s: posted enough\n",
956                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
957                         break;
958                 }
959
960                 if (consume_cred) {
961                         if (conn->ibc_credits == 0) {   /* no credits */
962                                 CDEBUG(D_NET, "%s: no credits\n",
963                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
964                                 break; /* NB ibc_tx_queue_nocred checked */
965                         }
966
967                         /* Last credit reserved for NOOP */
968                         if (conn->ibc_credits == 1 &&
969                             tx->tx_msg->ibm_type != IBLND_MSG_NOOP) {
970                                 CDEBUG(D_NET, "%s: not using last credit\n",
971                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
972                                 break; /* NB ibc_tx_noops checked */
973                         }
974                 }
975
976                 list_del(&tx->tx_list);
977                 tx->tx_queued = 0;
978
979                 /* NB don't drop ibc_lock before bumping tx_sending */
980
981                 if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP &&
982                     !kiblnd_send_noop(conn)) {
983                         /* redundant NOOP */
984                         spin_unlock(&conn->ibc_lock);
985                         kiblnd_tx_done(ni, tx);
986                         spin_lock(&conn->ibc_lock);
987                         CDEBUG(D_NET, "%s: redundant noop\n",
988                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
989                         continue;
990                 }
991
992                 kiblnd_pack_msg(ni, tx->tx_msg, conn->ibc_outstanding_credits,
993                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
994
995                 conn->ibc_outstanding_credits = 0;
996                 conn->ibc_nsends_posted++;
997                 if (consume_cred)
998                         conn->ibc_credits--;
999
1000                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
1001                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
1002                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
1003                  * and then re-queued here.  It's (just) possible that
1004                  * tx_sending is non-zero if we've not done the tx_complete() from
1005                  * the first send; hence the ++ rather than = below. */
1006                 tx->tx_sending++;
1007
1008                 list_add (&tx->tx_list, &conn->ibc_active_txs);
1009 #if 0
1010                 {
1011                         int i;
1012                         
1013                         for (i = 0; i < tx->tx_nwrq - 1; i++) {
1014                                 LASSERT (tx->tx_wrq[i].opcode == IB_WR_RDMA_WRITE);
1015                                 LASSERT (tx->tx_wrq[i].next == &tx->tx_wrq[i+1]);
1016                                 LASSERT (tx->tx_wrq[i].sg_list == &tx->tx_sge[i]);
1017                         
1018                                 CDEBUG(D_WARNING, "WORK[%d]: RDMA "LPX64
1019                                        " for %d k %x -> "LPX64" k %x\n", i,
1020                                        tx->tx_wrq[i].sg_list->addr,
1021                                        tx->tx_wrq[i].sg_list->length,
1022                                        tx->tx_wrq[i].sg_list->lkey,
1023                                        tx->tx_wrq[i].wr.rdma.remote_addr,
1024                                        tx->tx_wrq[i].wr.rdma.rkey);
1025                         }
1026                         
1027                         LASSERT (tx->tx_wrq[i].opcode == IB_WR_SEND);
1028                         LASSERT (tx->tx_wrq[i].next == NULL);
1029                         LASSERT (tx->tx_wrq[i].sg_list == &tx->tx_sge[i]);
1030                         
1031                         CDEBUG(D_WARNING, "WORK[%d]: SEND "LPX64" for %d k %x\n", i,
1032                                tx->tx_wrq[i].sg_list->addr,
1033                                tx->tx_wrq[i].sg_list->length,
1034                                tx->tx_wrq[i].sg_list->lkey);
1035                 }
1036 #endif           
1037                 /* I'm still holding ibc_lock! */
1038                 if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1039                         rc = -ECONNABORTED;
1040                 else
1041                         rc = ib_post_send(conn->ibc_cmid->qp, tx->tx_wrq, &bad_wrq);
1042
1043                 conn->ibc_last_send = jiffies;
1044
1045                 if (rc != 0) {
1046                         /* NB credits are transferred in the actual
1047                          * message, which can only be the last work item */
1048                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
1049                         if (consume_cred)
1050                                 conn->ibc_credits++;
1051                         conn->ibc_nsends_posted--;
1052
1053                         tx->tx_status = rc;
1054                         tx->tx_waiting = 0;
1055                         tx->tx_sending--;
1056
1057                         done = (tx->tx_sending == 0);
1058                         if (done)
1059                                 list_del (&tx->tx_list);
1060
1061                         spin_unlock(&conn->ibc_lock);
1062
1063                         if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
1064                                 CERROR("Error %d posting transmit to %s\n",
1065                                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1066                         else
1067                                 CDEBUG(D_NET, "Error %d posting transmit to %s\n",
1068                                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1069
1070                         kiblnd_close_conn(conn, rc);
1071
1072                         if (done)
1073                                 kiblnd_tx_done(ni, tx);
1074                         return;
1075                 }
1076         }
1077
1078         spin_unlock(&conn->ibc_lock);
1079 }
1080
1081 void
1082 kiblnd_tx_complete (kib_tx_t *tx, int status)
1083 {
1084         int           failed = (status != IB_WC_SUCCESS);
1085         kib_conn_t   *conn = tx->tx_conn;
1086         int           idle;
1087
1088         LASSERT (tx->tx_sending > 0);
1089
1090         if (failed) {
1091                 if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
1092                         CDEBUG(D_NETERROR, "Tx -> %s cookie "LPX64
1093                                " sending %d waiting %d: failed %d\n",
1094                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1095                                tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
1096                                status);
1097
1098                 kiblnd_close_conn(conn, -EIO);
1099         } else {
1100                 kiblnd_peer_alive(conn->ibc_peer);
1101         }
1102
1103         spin_lock(&conn->ibc_lock);
1104
1105         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1106          * gets to free it, which also drops its ref on 'conn'. */
1107
1108         tx->tx_sending--;
1109         conn->ibc_nsends_posted--;
1110
1111         if (failed) {
1112                 tx->tx_waiting = 0;             /* don't wait for peer */
1113                 tx->tx_status = -EIO;
1114         }
1115
1116         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1117                !tx->tx_waiting &&               /* Not waiting for peer */
1118                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1119         if (idle)
1120                 list_del(&tx->tx_list);
1121
1122         kiblnd_conn_addref(conn);               /* 1 ref for me.... */
1123
1124         spin_unlock(&conn->ibc_lock);
1125
1126         if (idle)
1127                 kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1128
1129         kiblnd_check_sends(conn);
1130
1131         kiblnd_conn_decref(conn);               /* ...until here */
1132 }
1133
1134 void
1135 kiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob)
1136 {
1137         kib_net_t         *net = ni->ni_data;
1138         struct ib_sge     *sge = &tx->tx_sge[tx->tx_nwrq];
1139         struct ib_send_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1140         int                nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1141
1142         LASSERT (net != NULL);
1143         LASSERT (tx->tx_nwrq >= 0);
1144         LASSERT (tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1145         LASSERT (nob <= IBLND_MSG_SIZE);
1146
1147         kiblnd_init_msg(tx->tx_msg, type, body_nob);
1148
1149         sge->addr = tx->tx_msgaddr;
1150         sge->lkey = net->ibn_dev->ibd_mr->lkey;
1151         sge->length = nob;
1152
1153         memset(wrq, 0, sizeof(*wrq));
1154
1155         wrq->next       = NULL;
1156         wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1157         wrq->sg_list    = sge;
1158         wrq->num_sge    = 1;
1159         wrq->opcode     = IB_WR_SEND;
1160         wrq->send_flags = IB_SEND_SIGNALED;
1161
1162         tx->tx_nwrq++;
1163 }
1164
1165 int
1166 kiblnd_init_rdma (lnet_ni_t *ni, kib_tx_t *tx, int type,
1167                   int nob, kib_rdma_desc_t *dstrd, __u64 dstcookie)
1168 {
1169         kib_msg_t         *ibmsg = tx->tx_msg;
1170         kib_rdma_desc_t   *srcrd = tx->tx_rd;
1171         struct ib_sge     *sge = &tx->tx_sge[0];
1172         struct ib_send_wr *wrq = &tx->tx_wrq[0];
1173         int                rc = nob;
1174
1175 #if IBLND_MAP_ON_DEMAND
1176         LASSERT (!in_interrupt());
1177         LASSERT (tx->tx_nwrq == 0);
1178         LASSERT (type == IBLND_MSG_GET_DONE ||
1179                  type == IBLND_MSG_PUT_DONE);
1180
1181         sge->addr = srcrd->rd_addr;
1182         sge->lkey = srcrd->rd_key;
1183         sge->length = nob;
1184
1185         wrq = &tx->tx_wrq[0];
1186
1187         wrq->next       = &tx->tx_wrq[1];
1188         wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1189         wrq->sg_list    = sge;
1190         wrq->num_sge    = 1;
1191         wrq->opcode     = IB_WR_RDMA_WRITE;
1192         wrq->send_flags = 0;
1193
1194         wrq->wr.rdma.remote_addr = dstrd->rd_addr;
1195         wrq->wr.rdma.rkey        = dstrd->rd_key;
1196
1197         tx->tx_nwrq = 1;
1198 #else
1199         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1200         int              resid = nob;
1201         kib_rdma_frag_t *srcfrag;
1202         int              srcidx;
1203         kib_rdma_frag_t *dstfrag;
1204         int              dstidx;
1205         int              wrknob;
1206
1207         LASSERT (!in_interrupt());
1208         LASSERT (tx->tx_nwrq == 0);
1209         LASSERT (type == IBLND_MSG_GET_DONE ||
1210                  type == IBLND_MSG_PUT_DONE);
1211
1212         srcidx = dstidx = 0;
1213         srcfrag = &srcrd->rd_frags[0];
1214         dstfrag = &dstrd->rd_frags[0];
1215
1216         while (resid > 0) {
1217                 if (srcidx >= srcrd->rd_nfrags) {
1218                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1219                         rc = -EPROTO;
1220                         break;
1221                 }
1222                 
1223                 if (dstidx == dstrd->rd_nfrags) {
1224                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1225                         rc = -EPROTO;
1226                         break;
1227                 }
1228
1229                 if (tx->tx_nwrq == IBLND_MAX_RDMA_FRAGS) {
1230                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1231                                srcidx, srcrd->rd_nfrags,
1232                                dstidx, dstrd->rd_nfrags);
1233                         rc = -EMSGSIZE;
1234                         break;
1235                 }
1236
1237                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1238
1239                 sge = &tx->tx_sge[tx->tx_nwrq];
1240                 sge->addr   = srcfrag->rf_addr;
1241                 sge->length = wrknob;
1242                 sge->lkey   = srcrd->rd_key;
1243
1244                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1245
1246                 wrq->next       = wrq + 1;
1247                 wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1248                 wrq->sg_list    = sge;
1249                 wrq->num_sge    = 1;
1250                 wrq->opcode     = IB_WR_RDMA_WRITE;
1251                 wrq->send_flags = 0;
1252
1253                 wrq->wr.rdma.remote_addr = dstfrag->rf_addr;
1254                 wrq->wr.rdma.rkey        = dstrd->rd_key;
1255
1256                 wrq++;
1257                 sge++;
1258
1259                 resid -= wrknob;
1260                 if (wrknob < srcfrag->rf_nob) {
1261                         srcfrag->rf_nob  -= wrknob;
1262                         srcfrag->rf_addr += wrknob;
1263                 } else {
1264                         srcfrag++;
1265                         srcidx++;
1266                 }
1267                 
1268                 if (wrknob < dstfrag->rf_nob) {
1269                         dstfrag->rf_nob  -= wrknob;
1270                         dstfrag->rf_addr += wrknob;
1271                 } else {
1272                         dstfrag++;
1273                         dstidx++;
1274                 }
1275
1276                 tx->tx_nwrq++;
1277         }
1278
1279         if (rc < 0)                             /* no RDMA if completing with failure */
1280                 tx->tx_nwrq = 0;
1281 #endif
1282         ibmsg->ibm_u.completion.ibcm_status = rc;
1283         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1284         kiblnd_init_tx_msg(ni, tx, type, sizeof (kib_completion_msg_t));
1285
1286         return rc;
1287 }
1288
1289 void
1290 kiblnd_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
1291 {
1292         struct list_head   *q;
1293
1294         LASSERT (tx->tx_nwrq > 0);              /* work items set up */
1295         LASSERT (!tx->tx_queued);               /* not queued for sending already */
1296
1297         tx->tx_queued = 1;
1298         tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * HZ);
1299
1300         if (tx->tx_conn == NULL) {
1301                 kiblnd_conn_addref(conn);
1302                 tx->tx_conn = conn;
1303                 LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1304         } else {
1305                 /* PUT_DONE first attached to conn as a PUT_REQ */
1306                 LASSERT (tx->tx_conn == conn);
1307                 LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1308         }
1309
1310         switch (tx->tx_msg->ibm_type) {
1311         default:
1312                 LBUG();
1313
1314         case IBLND_MSG_PUT_REQ:
1315         case IBLND_MSG_GET_REQ:
1316                 q = &conn->ibc_tx_queue_rsrvd;
1317                 break;
1318
1319         case IBLND_MSG_PUT_NAK:
1320         case IBLND_MSG_PUT_ACK:
1321         case IBLND_MSG_PUT_DONE:
1322         case IBLND_MSG_GET_DONE:
1323                 q = &conn->ibc_tx_queue_nocred;
1324                 break;
1325
1326         case IBLND_MSG_NOOP:
1327                 q = &conn->ibc_tx_noops;
1328                 break;
1329
1330         case IBLND_MSG_IMMEDIATE:
1331                 q = &conn->ibc_tx_queue;
1332                 break;
1333         }
1334
1335         list_add_tail(&tx->tx_list, q);
1336 }
1337
1338 void
1339 kiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1340 {
1341         spin_lock(&conn->ibc_lock);
1342         kiblnd_queue_tx_locked(tx, conn);
1343         spin_unlock(&conn->ibc_lock);
1344
1345         kiblnd_check_sends(conn);
1346 }
1347
1348 void
1349 kiblnd_connect_peer (kib_peer_t *peer)
1350 {
1351         struct rdma_cm_id *cmid;
1352         kib_net_t         *net = peer->ibp_ni->ni_data;
1353         struct sockaddr_in srcaddr;
1354         struct sockaddr_in dstaddr;
1355         int                rc;
1356
1357         LASSERT (net != NULL);
1358         LASSERT (peer->ibp_connecting > 0);
1359
1360         cmid = rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP);
1361         if (IS_ERR(cmid)) {
1362                 CERROR("Can't create CMID for %s: %ld\n",
1363                        libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1364                 rc = PTR_ERR(cmid);
1365                 goto failed;
1366         }
1367
1368         memset(&srcaddr, 0, sizeof(srcaddr));
1369         srcaddr.sin_family = AF_INET;
1370         srcaddr.sin_addr.s_addr = htonl(net->ibn_dev->ibd_ifip);
1371
1372         memset(&dstaddr, 0, sizeof(dstaddr));
1373         dstaddr.sin_family = AF_INET;
1374         dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1375         dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1376
1377         kiblnd_peer_addref(peer);               /* cmid's ref */
1378
1379         rc = rdma_resolve_addr(cmid,
1380                                (struct sockaddr *)&srcaddr,
1381                                (struct sockaddr *)&dstaddr,
1382                                *kiblnd_tunables.kib_timeout * 1000);
1383         if (rc == 0)
1384                 return;
1385
1386         /* Can't initiate address resolution:  */
1387         CERROR("Can't resolve addr for %s: %d\n",
1388                libcfs_nid2str(peer->ibp_nid), rc);
1389
1390         kiblnd_peer_decref(peer);               /* cmid's ref */
1391         rdma_destroy_id(cmid);
1392  failed:
1393         kiblnd_peer_connect_failed(peer, 1, rc);
1394 }
1395
1396 void
1397 kiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid)
1398 {
1399         kib_peer_t        *peer;
1400         kib_peer_t        *peer2;
1401         kib_conn_t        *conn;
1402         rwlock_t          *g_lock = &kiblnd_data.kib_global_lock;
1403         unsigned long      flags;
1404         int                rc;
1405
1406         /* If I get here, I've committed to send, so I complete the tx with
1407          * failure on any problems */
1408
1409         LASSERT (tx == NULL || tx->tx_conn == NULL); /* only set when assigned a conn */
1410         LASSERT (tx == NULL || tx->tx_nwrq > 0);     /* work items have been set up */
1411
1412         /* First time, just use a read lock since I expect to find my peer
1413          * connected */
1414         read_lock_irqsave(g_lock, flags);
1415
1416         peer = kiblnd_find_peer_locked(nid);
1417         if (peer != NULL && !list_empty(&peer->ibp_conns)) {
1418                 /* Found a peer with an established connection */
1419                 conn = kiblnd_get_conn_locked(peer);
1420                 kiblnd_conn_addref(conn); /* 1 ref for me... */
1421
1422                 read_unlock_irqrestore(g_lock, flags);
1423
1424                 if (tx != NULL)
1425                         kiblnd_queue_tx(tx, conn);
1426                 kiblnd_conn_decref(conn); /* ...to here */
1427                 return;
1428         }
1429
1430         read_unlock(g_lock);
1431         /* Re-try with a write lock */
1432         write_lock(g_lock);
1433
1434         peer = kiblnd_find_peer_locked(nid);
1435         if (peer != NULL) {
1436                 if (list_empty(&peer->ibp_conns)) {
1437                         /* found a peer, but it's still connecting... */
1438                         LASSERT (peer->ibp_connecting != 0 ||
1439                                  peer->ibp_accepting != 0);
1440                         if (tx != NULL)
1441                                 list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1442                         write_unlock_irqrestore(g_lock, flags);
1443                 } else {
1444                         conn = kiblnd_get_conn_locked(peer);
1445                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1446                         
1447                         write_unlock_irqrestore(g_lock, flags);
1448
1449                         if (tx != NULL)
1450                                 kiblnd_queue_tx(tx, conn);
1451                         kiblnd_conn_decref(conn); /* ...to here */
1452                 }
1453                 return;
1454         }
1455
1456         write_unlock_irqrestore(g_lock, flags);
1457
1458         /* Allocate a peer ready to add to the peer table and retry */
1459         rc = kiblnd_create_peer(ni, &peer, nid);
1460         if (rc != 0) {
1461                 CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1462                 if (tx != NULL) {
1463                         tx->tx_status = -EHOSTUNREACH;
1464                         tx->tx_waiting = 0;
1465                         kiblnd_tx_done(ni, tx);
1466                 }
1467                 return;
1468         }
1469
1470         write_lock_irqsave(g_lock, flags);
1471
1472         peer2 = kiblnd_find_peer_locked(nid);
1473         if (peer2 != NULL) {
1474                 if (list_empty(&peer2->ibp_conns)) {
1475                         /* found a peer, but it's still connecting... */
1476                         LASSERT (peer2->ibp_connecting != 0 ||
1477                                  peer2->ibp_accepting != 0);
1478                         if (tx != NULL)
1479                                 list_add_tail(&tx->tx_list, &peer2->ibp_tx_queue);
1480                         write_unlock_irqrestore(g_lock, flags);
1481                 } else {
1482                         conn = kiblnd_get_conn_locked(peer2);
1483                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1484
1485                         write_unlock_irqrestore(g_lock, flags);
1486
1487                         if (tx != NULL)
1488                                 kiblnd_queue_tx(tx, conn);
1489                         kiblnd_conn_decref(conn); /* ...to here */
1490                 }
1491
1492                 kiblnd_peer_decref(peer);
1493                 return;
1494         }
1495
1496         /* Brand new peer */
1497         LASSERT (peer->ibp_connecting == 0);
1498         peer->ibp_connecting = 1;
1499
1500         /* always called with a ref on ni, which prevents ni being shutdown */
1501         LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
1502
1503         if (tx != NULL)
1504                 list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1505
1506         kiblnd_peer_addref(peer);
1507         list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1508
1509         write_unlock_irqrestore(g_lock, flags);
1510
1511         kiblnd_connect_peer(peer);
1512         kiblnd_peer_decref(peer);
1513 }
1514
1515 int
1516 kiblnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1517 {
1518         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
1519         int               type = lntmsg->msg_type;
1520         lnet_process_id_t target = lntmsg->msg_target;
1521         int               target_is_router = lntmsg->msg_target_is_router;
1522         int               routing = lntmsg->msg_routing;
1523         unsigned int      payload_niov = lntmsg->msg_niov;
1524         struct iovec     *payload_iov = lntmsg->msg_iov;
1525         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1526         unsigned int      payload_offset = lntmsg->msg_offset;
1527         unsigned int      payload_nob = lntmsg->msg_len;
1528         kib_msg_t        *ibmsg;
1529         kib_tx_t         *tx;
1530         int               nob;
1531         int               rc;
1532
1533         /* NB 'private' is different depending on what we're sending.... */
1534
1535         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1536                payload_nob, payload_niov, libcfs_id2str(target));
1537
1538         LASSERT (payload_nob == 0 || payload_niov > 0);
1539         LASSERT (payload_niov <= LNET_MAX_IOV);
1540
1541         /* Thread context */
1542         LASSERT (!in_interrupt());
1543         /* payload is either all vaddrs or all pages */
1544         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1545
1546         switch (type) {
1547         default:
1548                 LBUG();
1549                 return (-EIO);
1550
1551         case LNET_MSG_ACK:
1552                 LASSERT (payload_nob == 0);
1553                 break;
1554
1555         case LNET_MSG_GET:
1556                 if (routing || target_is_router)
1557                         break;                  /* send IMMEDIATE */
1558
1559                 /* is the REPLY message too small for RDMA? */
1560                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1561                 if (nob <= IBLND_MSG_SIZE)
1562                         break;                  /* send IMMEDIATE */
1563
1564                 tx = kiblnd_get_idle_tx(ni);
1565                 if (tx == NULL) {
1566                         CERROR("Can allocate txd for GET to %s: \n",
1567                                libcfs_nid2str(target.nid));
1568                         return -ENOMEM;
1569                 }
1570
1571                 ibmsg = tx->tx_msg;
1572                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1573                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1574
1575                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1576                         rc = kiblnd_setup_rd_iov(ni, tx,
1577                                                  &ibmsg->ibm_u.get.ibgm_rd,
1578                                                  lntmsg->msg_md->md_niov,
1579                                                  lntmsg->msg_md->md_iov.iov,
1580                                                  0, lntmsg->msg_md->md_length);
1581                 else
1582                         rc = kiblnd_setup_rd_kiov(ni, tx,
1583                                                   &ibmsg->ibm_u.get.ibgm_rd,
1584                                                   lntmsg->msg_md->md_niov,
1585                                                   lntmsg->msg_md->md_iov.kiov,
1586                                                   0, lntmsg->msg_md->md_length);
1587                 if (rc != 0) {
1588                         CERROR("Can't setup GET sink for %s: %d\n",
1589                                libcfs_nid2str(target.nid), rc);
1590                         kiblnd_tx_done(ni, tx);
1591                         return -EIO;
1592                 }
1593 #if IBLND_MAP_ON_DEMAND
1594                 nob = sizeof(kib_get_msg_t);
1595 #else
1596                 nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[tx->tx_nfrags]);
1597 #endif
1598                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1599
1600                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1601                 if (tx->tx_lntmsg[1] == NULL) {
1602                         CERROR("Can't create reply for GET -> %s\n",
1603                                libcfs_nid2str(target.nid));
1604                         kiblnd_tx_done(ni, tx);
1605                         return -EIO;
1606                 }
1607
1608                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1609                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1610                 kiblnd_launch_tx(ni, tx, target.nid);
1611                 return 0;
1612
1613         case LNET_MSG_REPLY:
1614         case LNET_MSG_PUT:
1615                 /* Is the payload small enough not to need RDMA? */
1616                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1617                 if (nob <= IBLND_MSG_SIZE)
1618                         break;                  /* send IMMEDIATE */
1619
1620                 tx = kiblnd_get_idle_tx(ni);
1621                 if (tx == NULL) {
1622                         CERROR("Can't allocate %s txd for %s\n",
1623                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1624                                libcfs_nid2str(target.nid));
1625                         return -ENOMEM;
1626                 }
1627
1628                 if (payload_kiov == NULL)
1629                         rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1630                                                  payload_niov, payload_iov,
1631                                                  payload_offset, payload_nob);
1632                 else
1633                         rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1634                                                   payload_niov, payload_kiov,
1635                                                   payload_offset, payload_nob);
1636                 if (rc != 0) {
1637                         CERROR("Can't setup PUT src for %s: %d\n",
1638                                libcfs_nid2str(target.nid), rc);
1639                         kiblnd_tx_done(ni, tx);
1640                         return -EIO;
1641                 }
1642
1643                 ibmsg = tx->tx_msg;
1644                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1645                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1646                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1647
1648                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1649                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1650                 kiblnd_launch_tx(ni, tx, target.nid);
1651                 return 0;
1652         }
1653
1654         /* send IMMEDIATE */
1655
1656         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1657                  <= IBLND_MSG_SIZE);
1658
1659         tx = kiblnd_get_idle_tx(ni);
1660         if (tx == NULL) {
1661                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1662                         type, libcfs_nid2str(target.nid));
1663                 return -ENOMEM;
1664         }
1665
1666         ibmsg = tx->tx_msg;
1667         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1668
1669         if (payload_kiov != NULL)
1670                 lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg,
1671                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1672                                     payload_niov, payload_kiov,
1673                                     payload_offset, payload_nob);
1674         else
1675                 lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg,
1676                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1677                                    payload_niov, payload_iov,
1678                                    payload_offset, payload_nob);
1679
1680         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1681         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1682
1683         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1684         kiblnd_launch_tx(ni, tx, target.nid);
1685         return 0;
1686 }
1687
1688 void
1689 kiblnd_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1690 {
1691         lnet_process_id_t target = lntmsg->msg_target;
1692         unsigned int      niov = lntmsg->msg_niov;
1693         struct iovec     *iov = lntmsg->msg_iov;
1694         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1695         unsigned int      offset = lntmsg->msg_offset;
1696         unsigned int      nob = lntmsg->msg_len;
1697         kib_tx_t         *tx;
1698         int               rc;
1699
1700         tx = kiblnd_get_idle_tx(ni);
1701         if (tx == NULL) {
1702                 CERROR("Can't get tx for REPLY to %s\n",
1703                        libcfs_nid2str(target.nid));
1704                 goto failed_0;
1705         }
1706
1707         if (nob == 0)
1708                 rc = 0;
1709         else if (kiov == NULL)
1710                 rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1711                                          niov, iov, offset, nob);
1712         else
1713                 rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1714                                           niov, kiov, offset, nob);
1715
1716         if (rc != 0) {
1717                 CERROR("Can't setup GET src for %s: %d\n",
1718                        libcfs_nid2str(target.nid), rc);
1719                 goto failed_1;
1720         }
1721
1722         rc = kiblnd_init_rdma(ni, tx, IBLND_MSG_GET_DONE, nob,
1723                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1724                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1725         if (rc < 0) {
1726                 CERROR("Can't setup rdma for GET from %s: %d\n",
1727                        libcfs_nid2str(target.nid), rc);
1728                 goto failed_1;
1729         }
1730         
1731         if (nob == 0) {
1732                 /* No RDMA: local completion may happen now! */
1733                 lnet_finalize(ni, lntmsg, 0);
1734         } else {
1735                 /* RDMA: lnet_finalize(lntmsg) when it
1736                  * completes */
1737                 tx->tx_lntmsg[0] = lntmsg;
1738         }
1739
1740         kiblnd_queue_tx(tx, rx->rx_conn);
1741         return;
1742
1743  failed_1:
1744         kiblnd_tx_done(ni, tx);
1745  failed_0:
1746         lnet_finalize(ni, lntmsg, -EIO);
1747 }
1748
1749 int
1750 kiblnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1751              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1752              unsigned int offset, unsigned int mlen, unsigned int rlen)
1753 {
1754         kib_rx_t    *rx = private;
1755         kib_msg_t   *rxmsg = rx->rx_msg;
1756         kib_conn_t  *conn = rx->rx_conn;
1757         kib_tx_t    *tx;
1758         kib_msg_t   *txmsg;
1759         int          nob;
1760         int          post_credit = IBLND_POSTRX_PEER_CREDIT;
1761         int          rc = 0;
1762
1763         LASSERT (mlen <= rlen);
1764         LASSERT (!in_interrupt());
1765         /* Either all pages or all vaddrs */
1766         LASSERT (!(kiov != NULL && iov != NULL));
1767
1768         switch (rxmsg->ibm_type) {
1769         default:
1770                 LBUG();
1771
1772         case IBLND_MSG_IMMEDIATE:
1773                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1774                 if (nob > rx->rx_nob) {
1775                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1776                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1777                                 nob, rx->rx_nob);
1778                         rc = -EPROTO;
1779                         break;
1780                 }
1781
1782                 if (kiov != NULL)
1783                         lnet_copy_flat2kiov(niov, kiov, offset,
1784                                             IBLND_MSG_SIZE, rxmsg,
1785                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1786                                             mlen);
1787                 else
1788                         lnet_copy_flat2iov(niov, iov, offset,
1789                                            IBLND_MSG_SIZE, rxmsg,
1790                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1791                                            mlen);
1792                 lnet_finalize (ni, lntmsg, 0);
1793                 break;
1794
1795         case IBLND_MSG_PUT_REQ:
1796                 if (mlen == 0) {
1797                         lnet_finalize(ni, lntmsg, 0);
1798                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1799                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1800                         break;
1801                 }
1802
1803                 tx = kiblnd_get_idle_tx(ni);
1804                 if (tx == NULL) {
1805                         CERROR("Can't allocate tx for %s\n",
1806                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1807                         /* Not replying will break the connection */
1808                         rc = -ENOMEM;
1809                         break;
1810                 }
1811
1812                 txmsg = tx->tx_msg;
1813                 if (kiov == NULL)
1814                         rc = kiblnd_setup_rd_iov(ni, tx,
1815                                                  &txmsg->ibm_u.putack.ibpam_rd,
1816                                                  niov, iov, offset, mlen);
1817                 else
1818                         rc = kiblnd_setup_rd_kiov(ni, tx, 
1819                                                   &txmsg->ibm_u.putack.ibpam_rd,
1820                                                   niov, kiov, offset, mlen);
1821                 if (rc != 0) {
1822                         CERROR("Can't setup PUT sink for %s: %d\n",
1823                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1824                         kiblnd_tx_done(ni, tx);
1825                         /* tell peer it's over */
1826                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1827                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1828                         break;
1829                 }
1830
1831                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1832                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1833 #if IBLND_MAP_ON_DEMAND
1834                 nob = sizeof(kib_putack_msg_t);
1835 #else
1836                 nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[tx->tx_nfrags]);
1837 #endif
1838                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1839
1840                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1841                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1842                 kiblnd_queue_tx(tx, conn);
1843
1844                 /* reposted buffer reserved for PUT_DONE */
1845                 post_credit = IBLND_POSTRX_NO_CREDIT;
1846                 break;
1847
1848         case IBLND_MSG_GET_REQ:
1849                 if (lntmsg != NULL) {
1850                         /* Optimized GET; RDMA lntmsg's payload */
1851                         kiblnd_reply(ni, rx, lntmsg);
1852                 } else {
1853                         /* GET didn't match anything */
1854                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1855                                                -ENODATA,
1856                                                rxmsg->ibm_u.get.ibgm_cookie);
1857                 }
1858                 break;
1859         }
1860
1861         kiblnd_post_rx(rx, post_credit);
1862         return rc;
1863 }
1864
1865 int
1866 kiblnd_thread_start (int (*fn)(void *arg), void *arg)
1867 {
1868         long    pid = kernel_thread (fn, arg, 0);
1869
1870         if (pid < 0)
1871                 return ((int)pid);
1872
1873         atomic_inc (&kiblnd_data.kib_nthreads);
1874         return (0);
1875 }
1876
1877 void
1878 kiblnd_thread_fini (void)
1879 {
1880         atomic_dec (&kiblnd_data.kib_nthreads);
1881 }
1882
1883 void
1884 kiblnd_peer_alive (kib_peer_t *peer)
1885 {
1886         /* This is racy, but everyone's only writing cfs_time_current() */
1887         peer->ibp_last_alive = cfs_time_current();
1888         mb();
1889 }
1890
1891 void
1892 kiblnd_peer_notify (kib_peer_t *peer)
1893 {
1894         time_t        last_alive = 0;
1895         int           error = 0;
1896         unsigned long flags;
1897
1898         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1899
1900         if (list_empty(&peer->ibp_conns) &&
1901             peer->ibp_accepting == 0 &&
1902             peer->ibp_connecting == 0 &&
1903             peer->ibp_error != 0) {
1904                 error = peer->ibp_error;
1905                 peer->ibp_error = 0;
1906
1907                 last_alive = cfs_time_current_sec() -
1908                              cfs_duration_sec(cfs_time_current() -
1909                                               peer->ibp_last_alive);
1910         }
1911
1912         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1913
1914         if (error != 0)
1915                 lnet_notify(peer->ibp_ni,
1916                             peer->ibp_nid, 0, last_alive);
1917 }
1918
1919 void
1920 kiblnd_close_conn_locked (kib_conn_t *conn, int error)
1921 {
1922         /* This just does the immediate housekeeping.  'error' is zero for a
1923          * normal shutdown which can happen only after the connection has been
1924          * established.  If the connection is established, schedule the
1925          * connection to be finished off by the connd.  Otherwise the connd is
1926          * already dealing with it (either to set it up or tear it down).
1927          * Caller holds kib_global_lock exclusively in irq context */
1928         unsigned long     flags;
1929         kib_peer_t       *peer = conn->ibc_peer;
1930
1931         LASSERT (error != 0 || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1932
1933         if (error != 0 && conn->ibc_comms_error == 0)
1934                 conn->ibc_comms_error = error;
1935
1936         if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1937                 return; /* already being handled  */
1938
1939         if (error == 0 &&
1940             list_empty(&conn->ibc_tx_noops) &&
1941             list_empty(&conn->ibc_tx_queue) &&
1942             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1943             list_empty(&conn->ibc_tx_queue_nocred) &&
1944             list_empty(&conn->ibc_active_txs)) {
1945                 CDEBUG(D_NET, "closing conn to %s\n", 
1946                        libcfs_nid2str(peer->ibp_nid));
1947         } else {
1948                 CDEBUG(D_NETERROR, "Closing conn to %s: error %d%s%s%s%s%s\n",
1949                        libcfs_nid2str(peer->ibp_nid), error,
1950                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1951                        list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1952                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1953                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1954                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1955         }
1956
1957         list_del (&conn->ibc_list);
1958         /* connd (see below) takes over ibc_list's ref */
1959
1960         if (list_empty (&peer->ibp_conns) &&    /* no more conns */
1961             kiblnd_peer_active(peer)) {         /* still in peer table */
1962                 kiblnd_unlink_peer_locked(peer);
1963
1964                 /* set/clear error on last conn */
1965                 peer->ibp_error = conn->ibc_comms_error;
1966         }
1967
1968         kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1969
1970         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1971
1972         list_add_tail (&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1973         wake_up (&kiblnd_data.kib_connd_waitq);
1974
1975         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1976 }
1977
1978 void
1979 kiblnd_close_conn (kib_conn_t *conn, int error)
1980 {
1981         unsigned long flags;
1982
1983         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1984
1985         kiblnd_close_conn_locked(conn, error);
1986
1987         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1988 }
1989
1990 void
1991 kiblnd_handle_early_rxs(kib_conn_t *conn)
1992 {
1993         unsigned long    flags;
1994         kib_rx_t        *rx;
1995
1996         LASSERT (!in_interrupt());
1997         LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1998
1999         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2000         while (!list_empty(&conn->ibc_early_rxs)) {
2001                 rx = list_entry(conn->ibc_early_rxs.next,
2002                                 kib_rx_t, rx_list);
2003                 list_del(&rx->rx_list);
2004                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2005
2006                 kiblnd_handle_rx(rx);
2007
2008                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2009         }
2010         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2011 }
2012
2013 void
2014 kiblnd_abort_txs(kib_conn_t *conn, struct list_head *txs)
2015 {
2016         LIST_HEAD           (zombies); 
2017         struct list_head    *tmp;
2018         struct list_head    *nxt;
2019         kib_tx_t            *tx;
2020
2021         spin_lock(&conn->ibc_lock);
2022
2023         list_for_each_safe (tmp, nxt, txs) {
2024                 tx = list_entry (tmp, kib_tx_t, tx_list);
2025
2026                 if (txs == &conn->ibc_active_txs) {
2027                         LASSERT (!tx->tx_queued);
2028                         LASSERT (tx->tx_waiting ||
2029                                  tx->tx_sending != 0);
2030                 } else {
2031                         LASSERT (tx->tx_queued);
2032                 }
2033
2034                 tx->tx_status = -ECONNABORTED;
2035                 tx->tx_waiting = 0;
2036
2037                 if (tx->tx_sending == 0) {
2038                         tx->tx_queued = 0;
2039                         list_del (&tx->tx_list);
2040                         list_add (&tx->tx_list, &zombies);
2041                 }
2042         }
2043
2044         spin_unlock(&conn->ibc_lock);
2045
2046         kiblnd_txlist_done(conn->ibc_peer->ibp_ni,
2047                            &zombies, -ECONNABORTED);
2048 }
2049
2050 void
2051 kiblnd_finalise_conn (kib_conn_t *conn)
2052 {
2053         LASSERT (!in_interrupt());
2054         LASSERT (conn->ibc_state > IBLND_CONN_INIT);
2055
2056         kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2057
2058         /* abort_receives moves QP state to IB_QPS_ERR.  This is only required
2059          * for connections that didn't get as far as being connected, because
2060          * rdma_disconnect() does this for free. */
2061         kiblnd_abort_receives(conn);
2062
2063         /* Complete all tx descs not waiting for sends to complete.
2064          * NB we should be safe from RDMA now that the QP has changed state */
2065
2066         kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2067         kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2068         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2069         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2070         kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2071
2072         kiblnd_handle_early_rxs(conn);
2073 }
2074
2075 void
2076 kiblnd_peer_connect_failed (kib_peer_t *peer, int active, int error)
2077 {
2078         LIST_HEAD        (zombies);
2079         unsigned long     flags;
2080
2081         LASSERT (error != 0);
2082         LASSERT (!in_interrupt());
2083
2084         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2085
2086         if (active) {
2087                 LASSERT (peer->ibp_connecting > 0);
2088                 peer->ibp_connecting--;
2089         } else {
2090                 LASSERT (peer->ibp_accepting > 0);
2091                 peer->ibp_accepting--;
2092         }
2093
2094         if (peer->ibp_connecting != 0 ||
2095             peer->ibp_accepting != 0) {
2096                 /* another connection attempt under way... */
2097                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2098                 return;
2099         }
2100
2101         if (list_empty(&peer->ibp_conns)) {
2102                 /* Take peer's blocked transmits to complete with error */
2103                 list_add(&zombies, &peer->ibp_tx_queue);
2104                 list_del_init(&peer->ibp_tx_queue);
2105
2106                 if (kiblnd_peer_active(peer))
2107                         kiblnd_unlink_peer_locked(peer);
2108
2109                 peer->ibp_error = error;
2110         } else {
2111                 /* Can't have blocked transmits if there are connections */
2112                 LASSERT (list_empty(&peer->ibp_tx_queue));
2113         }
2114
2115         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2116
2117         kiblnd_peer_notify(peer);
2118
2119         if (list_empty (&zombies))
2120                 return;
2121
2122         CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
2123                 libcfs_nid2str(peer->ibp_nid));
2124
2125         kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2126 }
2127
2128 void
2129 kiblnd_connreq_done(kib_conn_t *conn, int status)
2130 {
2131         struct list_head   txs;
2132
2133         kib_peer_t        *peer = conn->ibc_peer;
2134         int                active;
2135         unsigned long      flags;
2136         kib_tx_t          *tx;
2137
2138         active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2139
2140         CDEBUG(D_NET,"%s: %d, %d\n", libcfs_nid2str(peer->ibp_nid), 
2141                active, status);
2142
2143         LASSERT (!in_interrupt());
2144         LASSERT ((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2145                   peer->ibp_connecting > 0) ||
2146                  (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2147                   peer->ibp_accepting > 0));
2148
2149         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2150         conn->ibc_connvars = NULL;
2151
2152         if (status != 0) {
2153                 /* failed to establish connection */
2154                 kiblnd_peer_connect_failed(peer, active, status);
2155                 kiblnd_finalise_conn(conn);
2156                 return;
2157         }
2158
2159         /* connection established */
2160         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2161
2162         conn->ibc_last_send = jiffies;
2163         kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2164         kiblnd_peer_alive(peer);
2165
2166         /* Add conn to peer's list and nuke any dangling conns from a different
2167          * peer instance... */
2168         kiblnd_conn_addref(conn);               /* +1 ref for ibc_list */
2169         list_add(&conn->ibc_list, &peer->ibp_conns);
2170         if (active)
2171                 peer->ibp_connecting--;
2172         else
2173                 peer->ibp_accepting--;
2174
2175         kiblnd_close_stale_conns_locked(peer, conn->ibc_incarnation);
2176
2177         /* grab pending txs while I have the lock */
2178         list_add(&txs, &peer->ibp_tx_queue);
2179         list_del_init(&peer->ibp_tx_queue);
2180
2181         if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2182             conn->ibc_comms_error != 0) {       /* error has happened already */
2183                 lnet_ni_t *ni = peer->ibp_ni;
2184
2185                 /* start to shut down connection */
2186                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
2187                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2188
2189                 kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2190
2191                 return;
2192         }
2193
2194         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2195
2196         /* Schedule blocked txs */
2197         spin_lock (&conn->ibc_lock);
2198         while (!list_empty (&txs)) {
2199                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2200                 list_del (&tx->tx_list);
2201
2202                 kiblnd_queue_tx_locked(tx, conn);
2203         }
2204         spin_unlock (&conn->ibc_lock);
2205
2206         kiblnd_check_sends(conn);
2207
2208         /* schedule blocked rxs */
2209         kiblnd_handle_early_rxs(conn);
2210 }
2211
2212 void
2213 kiblnd_reject(struct rdma_cm_id *cmid, int why)
2214 {
2215         int          rc;
2216         kib_rej_t    rej = {.ibr_magic   = IBLND_MSG_MAGIC,
2217                             .ibr_version = IBLND_MSG_VERSION,
2218                             .ibr_why     = why};
2219
2220         rc = rdma_reject(cmid, &rej, sizeof(rej));
2221
2222         if (rc != 0)
2223                 CWARN("Error %d sending reject\n", rc);
2224 }
2225
2226 int
2227 kiblnd_passive_connect (struct rdma_cm_id *cmid, void *priv, int priv_nob)
2228 {
2229         kib_msg_t             *ackmsg;
2230         kib_msg_t             *reqmsg = priv;
2231         rwlock_t              *g_lock = &kiblnd_data.kib_global_lock;
2232         struct rdma_conn_param cp;
2233         unsigned long          flags;
2234         lnet_ni_t             *ni = NULL;
2235         kib_dev_t             *ibdev;
2236         kib_peer_t            *peer;
2237         kib_peer_t            *peer2;
2238         kib_conn_t            *conn;
2239         lnet_nid_t             nid;
2240         int                    rc;
2241         int                    rej = IBLND_REJECT_FATAL;
2242
2243         LASSERT (!in_interrupt());
2244
2245         /* cmid inherits 'context' from the corresponding listener id */
2246         ibdev = (kib_dev_t *)cmid->context;
2247         LASSERT (ibdev != NULL);
2248
2249         if (priv_nob < offsetof(kib_msg_t, ibm_type)) {
2250                 CERROR("Short connection request\n");
2251                 goto failed;
2252         }
2253
2254         if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2255             reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC) ||
2256             (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2257              reqmsg->ibm_version != IBLND_MSG_VERSION) ||
2258             (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2259              reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION))) {
2260                 /* Future protocol version compatibility support!  If the
2261                  * o2iblnd-specific protocol changes, or when LNET unifies
2262                  * protocols over all LNDs, the initial connection will
2263                  * negotiate a protocol version.  I trap this here to avoid
2264                  * console errors; the reject tells the peer which protocol I
2265                  * speak. */
2266                 goto failed;
2267         }
2268
2269         rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2270         if (rc != 0) {
2271                 CERROR("Can't parse connection request: %d\n", rc);
2272                 goto failed;
2273         }
2274
2275         nid = reqmsg->ibm_srcnid;
2276
2277         if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2278                 CERROR("Unexpected connreq msg type: %x from %s\n",
2279                        reqmsg->ibm_type, libcfs_nid2str(nid));
2280                 goto failed;
2281         }
2282
2283         if (reqmsg->ibm_u.connparams.ibcp_queue_depth != IBLND_MSG_QUEUE_SIZE) {
2284                 CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
2285                        libcfs_nid2str(nid),
2286                        reqmsg->ibm_u.connparams.ibcp_queue_depth,
2287                        IBLND_MSG_QUEUE_SIZE);
2288                 goto failed;
2289         }
2290
2291         if (reqmsg->ibm_u.connparams.ibcp_max_frags != IBLND_MAX_RDMA_FRAGS) {
2292                 CERROR("Can't accept %s: incompatible max_frags %d (%d wanted)\n",
2293                        libcfs_nid2str(nid),
2294                        reqmsg->ibm_u.connparams.ibcp_max_frags,
2295                        IBLND_MAX_RDMA_FRAGS);
2296                 goto failed;
2297         }
2298
2299         if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2300                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2301                        libcfs_nid2str(nid),
2302                        reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2303                        IBLND_MSG_SIZE);
2304                 goto failed;
2305         }
2306
2307         ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2308         if (ni == NULL ||                               /* no matching net */
2309             ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2310             ((kib_net_t*)ni->ni_data)->ibn_dev != ibdev) { /* wrong device */
2311                 CERROR("Can't accept %s: bad dst nid %s\n",
2312                        libcfs_nid2str(nid),
2313                        libcfs_nid2str(reqmsg->ibm_dstnid));
2314
2315                 goto failed;
2316         }
2317         
2318         /* assume 'nid' is a new peer; create  */
2319         rc = kiblnd_create_peer(ni, &peer, nid);
2320         if (rc != 0) {
2321                 CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2322                 rej = IBLND_REJECT_NO_RESOURCES;
2323                 goto failed;
2324         }
2325
2326         write_lock_irqsave(g_lock, flags);
2327
2328         peer2 = kiblnd_find_peer_locked(nid);
2329         if (peer2 != NULL) {
2330                 /* tie-break connection race in favour of the higher NID */                
2331                 if (peer2->ibp_connecting != 0 &&
2332                     nid < ni->ni_nid) {
2333                         write_unlock_irqrestore(g_lock, flags);
2334
2335                         CWARN("Conn race %s\n",
2336                               libcfs_nid2str(peer2->ibp_nid));
2337
2338                         kiblnd_peer_decref(peer);
2339                         rej = IBLND_REJECT_CONN_RACE;
2340                         goto failed;
2341                 }
2342
2343                 peer2->ibp_accepting++;
2344                 kiblnd_peer_addref(peer2);
2345
2346                 write_unlock_irqrestore(g_lock, flags);
2347                 kiblnd_peer_decref(peer);
2348                 peer = peer2;
2349         } else {
2350                 /* Brand new peer */
2351                 LASSERT (peer->ibp_accepting == 0);
2352                 peer->ibp_accepting = 1;
2353
2354                 /* I have a ref on ni that prevents it being shutdown */
2355                 LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
2356
2357                 kiblnd_peer_addref(peer);
2358                 list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2359
2360                 write_unlock_irqrestore(g_lock, flags);
2361         }
2362
2363         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT);
2364         if (conn == NULL) {
2365                 kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2366                 kiblnd_peer_decref(peer);
2367                 rej = IBLND_REJECT_NO_RESOURCES;
2368                 goto failed;
2369         }
2370
2371         /* conn now "owns" cmid, so I return success from here on to ensure the
2372          * CM callback doesn't destroy cmid. */
2373
2374         conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2375         conn->ibc_credits          = IBLND_MSG_QUEUE_SIZE;
2376         conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE;
2377         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2378                  <= IBLND_RX_MSGS);
2379
2380         ackmsg = &conn->ibc_connvars->cv_msg;
2381         memset(ackmsg, 0, sizeof(*ackmsg));
2382
2383         kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2384                         sizeof(ackmsg->ibm_u.connparams));
2385         ackmsg->ibm_u.connparams.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE;
2386         ackmsg->ibm_u.connparams.ibcp_max_frags = IBLND_MAX_RDMA_FRAGS;
2387         ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2388         kiblnd_pack_msg(ni, ackmsg, 0, nid, reqmsg->ibm_srcstamp);
2389
2390         memset(&cp, 0, sizeof(cp));
2391         cp.private_data        = ackmsg;
2392         cp.private_data_len    = ackmsg->ibm_nob;
2393         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2394         cp.initiator_depth     = 0;
2395         cp.flow_control        = 1;
2396         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2397         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2398
2399         CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2400
2401         rc = rdma_accept(cmid, &cp);
2402         if (rc != 0) {
2403                 CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2404                 kiblnd_reject(cmid, IBLND_REJECT_FATAL);
2405                 kiblnd_connreq_done(conn, rc);
2406                 kiblnd_conn_decref(conn);
2407         }
2408
2409         lnet_ni_decref(ni);
2410         return 0;
2411
2412  failed:
2413         if (ni != NULL)
2414                 lnet_ni_decref(ni);
2415
2416         kiblnd_reject(cmid, rej);
2417         return -ECONNREFUSED;
2418 }
2419
2420 void
2421 kiblnd_reconnect (kib_conn_t *conn, char *why)
2422 {
2423         kib_peer_t    *peer = conn->ibc_peer;
2424         int            retry = 0;
2425         unsigned long  flags;
2426         
2427         LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2428         LASSERT (peer->ibp_connecting > 0);     /* 'conn' at least */
2429
2430         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2431
2432         /* retry connection if it's still needed and no other connection
2433          * attempts (active or passive) are in progress */
2434         if (!list_empty(&peer->ibp_tx_queue) &&
2435             peer->ibp_connecting == 1 &&
2436             peer->ibp_accepting == 0) {
2437                 retry = 1;
2438                 peer->ibp_connecting++;
2439         }
2440         
2441         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2442
2443         if (retry) {
2444                 CDEBUG(D_NETERROR, "%s: retrying (%s)\n", 
2445                        libcfs_nid2str(peer->ibp_nid), why);
2446                 kiblnd_connect_peer(peer);
2447         }
2448 }
2449
2450 void
2451 kiblnd_rejected (kib_conn_t *conn, int reason, void *priv, int priv_nob)
2452 {
2453         kib_peer_t    *peer = conn->ibc_peer;
2454
2455         LASSERT (!in_interrupt());
2456         LASSERT (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2457
2458         switch (reason) {
2459         case IB_CM_REJ_STALE_CONN:
2460                 kiblnd_reconnect(conn, "stale");
2461                 break;
2462
2463         case IB_CM_REJ_CONSUMER_DEFINED:
2464                 if (priv_nob >= sizeof(kib_rej_t)) {
2465                         kib_rej_t *rej = priv;
2466
2467                         if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2468                             rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2469                                 __swab32s(&rej->ibr_magic);
2470                                 __swab16s(&rej->ibr_version);
2471                         }
2472
2473                         if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2474                             rej->ibr_magic != LNET_PROTO_MAGIC) {
2475                                 CERROR("%s rejected: consumer defined fatal error\n",
2476                                        libcfs_nid2str(peer->ibp_nid));
2477                                 break;
2478                         }
2479                         
2480                         if (rej->ibr_version != IBLND_MSG_VERSION) {
2481                                 CERROR("%s rejected: o2iblnd version %d error\n",
2482                                        libcfs_nid2str(peer->ibp_nid),
2483                                        rej->ibr_version);
2484                                 break;
2485                         }
2486                         
2487                         switch (rej->ibr_why) {
2488                         case IBLND_REJECT_CONN_RACE:
2489                                 kiblnd_reconnect(conn, "conn race");
2490                                 break;
2491                                 
2492                         case IBLND_REJECT_NO_RESOURCES:
2493                                 CERROR("%s rejected: o2iblnd no resources\n",
2494                                        libcfs_nid2str(peer->ibp_nid));
2495                                 break;
2496                         case IBLND_REJECT_FATAL:
2497                                 CERROR("%s rejected: o2iblnd fatal error\n",
2498                                        libcfs_nid2str(peer->ibp_nid));
2499                                 break;
2500                         default:
2501                                 CERROR("%s rejected: o2iblnd reason %d\n",
2502                                        libcfs_nid2str(peer->ibp_nid),
2503                                        rej->ibr_why);
2504                                 break;
2505                         }
2506                         break;
2507                 }
2508                 /* fall through */
2509         default:
2510                 CDEBUG(D_NETERROR, "%s rejected: reason %d, size %d\n",
2511                        libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2512                 break;
2513         }
2514
2515         kiblnd_connreq_done(conn, -ECONNREFUSED);
2516 }
2517
2518 void
2519 kiblnd_check_connreply (kib_conn_t *conn, void *priv, int priv_nob)
2520 {
2521         kib_peer_t    *peer = conn->ibc_peer;
2522         lnet_ni_t     *ni = peer->ibp_ni;
2523         kib_net_t     *net = ni->ni_data;
2524         kib_msg_t     *msg = priv;
2525         int            rc = kiblnd_unpack_msg(msg, priv_nob);
2526         unsigned long  flags;
2527
2528         LASSERT (net != NULL);
2529
2530         if (rc != 0) {
2531                 CERROR("Can't unpack connack from %s: %d\n",
2532                        libcfs_nid2str(peer->ibp_nid), rc);
2533                 goto failed;
2534         }
2535
2536         if (msg->ibm_type != IBLND_MSG_CONNACK) {
2537                 CERROR("Unexpected message %d from %s\n",
2538                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2539                 rc = -EPROTO;
2540                 goto failed;
2541         }
2542
2543         if (msg->ibm_u.connparams.ibcp_queue_depth != IBLND_MSG_QUEUE_SIZE) {
2544                 CERROR("%s has incompatible queue depth %d(%d wanted)\n",
2545                        libcfs_nid2str(peer->ibp_nid),
2546                        msg->ibm_u.connparams.ibcp_queue_depth,
2547                        IBLND_MSG_QUEUE_SIZE);
2548                 rc = -EPROTO;
2549                 goto failed;
2550         }
2551
2552         if (msg->ibm_u.connparams.ibcp_max_frags != IBLND_MAX_RDMA_FRAGS) {
2553                 CERROR("%s has incompatible max_frags %d (%d wanted)\n",
2554                        libcfs_nid2str(peer->ibp_nid),
2555                        msg->ibm_u.connparams.ibcp_max_frags,
2556                        IBLND_MAX_RDMA_FRAGS);
2557                 rc = -EPROTO;
2558                 goto failed;
2559         }
2560
2561         if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2562                 CERROR("%s max message size %d too big (%d max)\n",
2563                        libcfs_nid2str(peer->ibp_nid),
2564                        msg->ibm_u.connparams.ibcp_max_msg_size,
2565                        IBLND_MSG_SIZE);
2566                 rc = -EPROTO;
2567                 goto failed;
2568         }
2569
2570         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2571         if (msg->ibm_dstnid == ni->ni_nid &&
2572             msg->ibm_dststamp == net->ibn_incarnation)
2573                 rc = 0;
2574         else
2575                 rc = -ESTALE;
2576         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2577
2578         if (rc != 0) {
2579                 CERROR("Stale connection reply from %s\n",
2580                        libcfs_nid2str(peer->ibp_nid));
2581                 goto failed;
2582         }
2583
2584         conn->ibc_incarnation      = msg->ibm_srcstamp;
2585         conn->ibc_credits          = IBLND_MSG_QUEUE_SIZE;
2586         conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE;
2587         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2588                  <= IBLND_RX_MSGS);
2589
2590         kiblnd_connreq_done(conn, 0);
2591         return;
2592
2593  failed:
2594         /* NB My QP has already established itself, so I handle anything going
2595          * wrong here by setting ibc_comms_error.
2596          * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2597          * immediately tears it down. */
2598
2599         LASSERT (rc != 0);
2600         conn->ibc_comms_error = rc;
2601         kiblnd_connreq_done(conn, 0);
2602 }
2603
2604 int
2605 kiblnd_active_connect (struct rdma_cm_id *cmid)
2606 {
2607         kib_peer_t              *peer = (kib_peer_t *)cmid->context;
2608         kib_conn_t              *conn;
2609         kib_msg_t               *msg;
2610         struct rdma_conn_param   cp;
2611         int                      rc;
2612
2613         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT);
2614         if (conn == NULL) {
2615                 kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2616                 kiblnd_peer_decref(peer); /* lose cmid's ref */
2617                 return -ENOMEM;
2618         }
2619
2620         /* conn "owns" cmid now, so I return success from here on to ensure the
2621          * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2622          * on peer */
2623
2624         msg = &conn->ibc_connvars->cv_msg;
2625
2626         memset(msg, 0, sizeof(*msg));
2627         kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2628         msg->ibm_u.connparams.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE;
2629         msg->ibm_u.connparams.ibcp_max_frags = IBLND_MAX_RDMA_FRAGS;
2630         msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2631         kiblnd_pack_msg(peer->ibp_ni, msg, 0, peer->ibp_nid, 0);
2632         
2633         memset(&cp, 0, sizeof(cp));
2634         cp.private_data        = msg;
2635         cp.private_data_len    = msg->ibm_nob;
2636         cp.responder_resources = 0;             /* No atomic ops or RDMA reads */
2637         cp.initiator_depth     = 0;
2638         cp.flow_control        = 1;
2639         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2640         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2641
2642         LASSERT(cmid->context == (void *)conn);
2643         LASSERT(conn->ibc_cmid == cmid);
2644         
2645         rc = rdma_connect(cmid, &cp);
2646         if (rc != 0) {
2647                 CERROR("Can't connect to %s: %d\n",
2648                        libcfs_nid2str(peer->ibp_nid), rc);
2649                 kiblnd_connreq_done(conn, rc);
2650                 kiblnd_conn_decref(conn);
2651         }
2652
2653         return 0;
2654 }
2655
2656 int
2657 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2658 {
2659         kib_peer_t  *peer;
2660         kib_conn_t  *conn;
2661         int          rc;
2662
2663         switch (event->event) {
2664         default:
2665                 LBUG();
2666
2667         case RDMA_CM_EVENT_CONNECT_REQUEST:
2668                 /* destroy cmid on failure */
2669                 rc = kiblnd_passive_connect(cmid, 
2670                                             (void *)KIBLND_CONN_PARAM(event),
2671                                             KIBLND_CONN_PARAM_LEN(event));
2672                 CDEBUG(D_NET, "connreq: %d\n", rc);
2673                 return rc;
2674                 
2675         case RDMA_CM_EVENT_ADDR_ERROR:
2676                 peer = (kib_peer_t *)cmid->context;
2677                 CDEBUG(D_NETERROR, "%s: ADDR ERROR %d\n",
2678                        libcfs_nid2str(peer->ibp_nid), event->status);
2679                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2680                 kiblnd_peer_decref(peer);
2681                 return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2682
2683         case RDMA_CM_EVENT_ADDR_RESOLVED:
2684                 peer = (kib_peer_t *)cmid->context;
2685
2686                 CDEBUG(D_NET,"%s Addr resolved: %d\n",
2687                        libcfs_nid2str(peer->ibp_nid), event->status);
2688
2689                 if (event->status != 0) {
2690                         CDEBUG(D_NETERROR, "Can't resolve address for %s: %d\n",
2691                                libcfs_nid2str(peer->ibp_nid), event->status);
2692                         rc = event->status;
2693                 } else {
2694                         rc = rdma_resolve_route(
2695                                 cmid, *kiblnd_tunables.kib_timeout * 1000);
2696                         if (rc == 0)
2697                                 return 0;
2698                         /* Can't initiate route resolution */
2699                         CERROR("Can't resolve route for %s: %d\n",
2700                                libcfs_nid2str(peer->ibp_nid), rc);
2701                 }
2702                 kiblnd_peer_connect_failed(peer, 1, rc);
2703                 kiblnd_peer_decref(peer);
2704                 return rc;                      /* rc != 0 destroys cmid */
2705
2706         case RDMA_CM_EVENT_ROUTE_ERROR:
2707                 peer = (kib_peer_t *)cmid->context;
2708                 CDEBUG(D_NETERROR, "%s: ROUTE ERROR %d\n",
2709                        libcfs_nid2str(peer->ibp_nid), event->status);
2710                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2711                 kiblnd_peer_decref(peer);
2712                 return -EHOSTUNREACH;           /* rc != 0 destroys cmid */
2713
2714         case RDMA_CM_EVENT_ROUTE_RESOLVED:
2715                 peer = (kib_peer_t *)cmid->context;
2716                 CDEBUG(D_NET,"%s Route resolved: %d\n",
2717                        libcfs_nid2str(peer->ibp_nid), event->status);
2718
2719                 if (event->status == 0)
2720                         return kiblnd_active_connect(cmid);
2721
2722                 CDEBUG(D_NETERROR, "Can't resolve route for %s: %d\n",
2723                        libcfs_nid2str(peer->ibp_nid), event->status);
2724                 kiblnd_peer_connect_failed(peer, 1, event->status);
2725                 kiblnd_peer_decref(peer);
2726                 return event->status;           /* rc != 0 destroys cmid */
2727                 
2728         case RDMA_CM_EVENT_UNREACHABLE:
2729                 conn = (kib_conn_t *)cmid->context;
2730                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2731                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2732                 CDEBUG(D_NETERROR, "%s: UNREACHABLE %d\n",
2733                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2734                 kiblnd_connreq_done(conn, -ENETDOWN);
2735                 kiblnd_conn_decref(conn);
2736                 return 0;
2737
2738         case RDMA_CM_EVENT_CONNECT_ERROR:
2739                 conn = (kib_conn_t *)cmid->context;
2740                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2741                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2742                 CDEBUG(D_NETERROR, "%s: CONNECT ERROR %d\n",
2743                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2744                 kiblnd_connreq_done(conn, -ENOTCONN);
2745                 kiblnd_conn_decref(conn);
2746                 return 0;
2747
2748         case RDMA_CM_EVENT_REJECTED:
2749                 conn = (kib_conn_t *)cmid->context;
2750                 switch (conn->ibc_state) {
2751                 default:
2752                         LBUG();
2753
2754                 case IBLND_CONN_PASSIVE_WAIT:
2755                         CERROR ("%s: REJECTED %d\n",
2756                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
2757                                 event->status);
2758                         kiblnd_connreq_done(conn, -ECONNRESET);
2759                         break;
2760
2761                 case IBLND_CONN_ACTIVE_CONNECT:
2762                         kiblnd_rejected(conn, event->status,
2763                                         (void *)KIBLND_CONN_PARAM(event),
2764                                         KIBLND_CONN_PARAM_LEN(event));
2765                         break;
2766                 }
2767                 kiblnd_conn_decref(conn);
2768                 return 0;
2769
2770         case RDMA_CM_EVENT_ESTABLISHED:
2771                 conn = (kib_conn_t *)cmid->context;
2772                 switch (conn->ibc_state) {
2773                 default:
2774                         LBUG();
2775
2776                 case IBLND_CONN_PASSIVE_WAIT:
2777                         CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
2778                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2779                         kiblnd_connreq_done(conn, 0);
2780                         break;
2781
2782                 case IBLND_CONN_ACTIVE_CONNECT:
2783                         CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
2784                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2785                         kiblnd_check_connreply(conn,
2786                                                (void *)KIBLND_CONN_PARAM(event),
2787                                                KIBLND_CONN_PARAM_LEN(event));
2788                         break;
2789                 }
2790                 /* net keeps its ref on conn! */
2791                 return 0;
2792
2793         case RDMA_CM_EVENT_DISCONNECTED:
2794                 conn = (kib_conn_t *)cmid->context;
2795                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
2796                         CERROR("%s DISCONNECTED\n",
2797                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2798                         kiblnd_connreq_done(conn, -ECONNRESET);
2799                 } else {
2800                         kiblnd_close_conn(conn, 0);
2801                 }
2802                 kiblnd_conn_decref(conn);
2803                 return 0;
2804
2805         case RDMA_CM_EVENT_DEVICE_REMOVAL:
2806                 LCONSOLE_ERROR_MSG(0x131,
2807                                    "Received notification of device removal\n"
2808                                    "Please shutdown LNET to allow this to proceed\n");
2809                 /* Can't remove network from underneath LNET for now, so I have
2810                  * to ignore this */
2811                 return 0;
2812         }
2813 }
2814
2815 int
2816 kiblnd_check_txs (kib_conn_t *conn, struct list_head *txs)
2817 {
2818         kib_tx_t          *tx;
2819         struct list_head  *ttmp;
2820         int                timed_out = 0;
2821
2822         spin_lock(&conn->ibc_lock);
2823
2824         list_for_each (ttmp, txs) {
2825                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2826
2827                 if (txs != &conn->ibc_active_txs) {
2828                         LASSERT (tx->tx_queued);
2829                 } else {
2830                         LASSERT (!tx->tx_queued);
2831                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2832                 }
2833
2834                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2835                         timed_out = 1;
2836                         break;
2837                 }
2838         }
2839
2840         spin_unlock(&conn->ibc_lock);
2841         return timed_out;
2842 }
2843
2844 int
2845 kiblnd_conn_timed_out (kib_conn_t *conn)
2846 {
2847         return  kiblnd_check_txs(conn, &conn->ibc_tx_queue) ||
2848                 kiblnd_check_txs(conn, &conn->ibc_tx_noops) ||
2849                 kiblnd_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
2850                 kiblnd_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
2851                 kiblnd_check_txs(conn, &conn->ibc_active_txs);
2852 }
2853
2854 void
2855 kiblnd_check_conns (int idx)
2856 {
2857         struct list_head  *peers = &kiblnd_data.kib_peers[idx];
2858         struct list_head  *ptmp;
2859         kib_peer_t        *peer;
2860         kib_conn_t        *conn;
2861         struct list_head  *ctmp;
2862         unsigned long      flags;
2863
2864  again:
2865         /* NB. We expect to have a look at all the peers and not find any
2866          * rdmas to time out, so we just use a shared lock while we
2867          * take a look... */
2868         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2869
2870         list_for_each (ptmp, peers) {
2871                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2872
2873                 list_for_each (ctmp, &peer->ibp_conns) {
2874                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2875
2876                         LASSERT (conn->ibc_state == IBLND_CONN_ESTABLISHED);
2877
2878                         /* In case we have enough credits to return via a
2879                          * NOOP, but there were no non-blocking tx descs
2880                          * free to do it last time... */
2881                         kiblnd_check_sends(conn);
2882
2883                         if (!kiblnd_conn_timed_out(conn))
2884                                 continue;
2885
2886                         /* Handle timeout by closing the whole connection.  We
2887                          * can only be sure RDMA activity has ceased once the
2888                          * QP has been modified. */
2889
2890                         kiblnd_conn_addref(conn); /* 1 ref for me... */
2891
2892                         read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2893                                                flags);
2894
2895                         CERROR("Timed out RDMA with %s\n",
2896                                libcfs_nid2str(peer->ibp_nid));
2897
2898                         kiblnd_close_conn(conn, -ETIMEDOUT);
2899                         kiblnd_conn_decref(conn); /* ...until here */
2900
2901                         /* start again now I've dropped the lock */
2902                         goto again;
2903                 }
2904         }
2905
2906         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2907 }
2908
2909 void
2910 kiblnd_disconnect_conn (kib_conn_t *conn)
2911 {
2912         LASSERT (!in_interrupt());
2913         LASSERT (current == kiblnd_data.kib_connd);
2914         LASSERT (conn->ibc_state == IBLND_CONN_CLOSING);
2915
2916         rdma_disconnect(conn->ibc_cmid);
2917         kiblnd_finalise_conn(conn);
2918
2919         kiblnd_peer_notify(conn->ibc_peer);
2920 }
2921
2922 int
2923 kiblnd_connd (void *arg)
2924 {
2925         wait_queue_t       wait;
2926         unsigned long      flags;
2927         kib_conn_t        *conn;
2928         int                timeout;
2929         int                i;
2930         int                dropped_lock;
2931         int                peer_index = 0;
2932         unsigned long      deadline = jiffies;
2933
2934         cfs_daemonize ("kiblnd_connd");
2935         cfs_block_allsigs ();
2936
2937         init_waitqueue_entry (&wait, current);
2938         kiblnd_data.kib_connd = current;
2939
2940         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
2941
2942         while (!kiblnd_data.kib_shutdown) {
2943
2944                 dropped_lock = 0;
2945
2946                 if (!list_empty (&kiblnd_data.kib_connd_zombies)) {
2947                         conn = list_entry (kiblnd_data.kib_connd_zombies.next,
2948                                            kib_conn_t, ibc_list);
2949                         list_del (&conn->ibc_list);
2950
2951                         spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
2952                         dropped_lock = 1;
2953
2954                         kiblnd_destroy_conn(conn);
2955
2956                         spin_lock_irqsave (&kiblnd_data.kib_connd_lock, flags);
2957                 }
2958
2959                 if (!list_empty (&kiblnd_data.kib_connd_conns)) {
2960                         conn = list_entry (kiblnd_data.kib_connd_conns.next,
2961                                            kib_conn_t, ibc_list);
2962                         list_del (&conn->ibc_list);
2963
2964                         spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
2965                         dropped_lock = 1;
2966
2967                         kiblnd_disconnect_conn(conn);
2968                         kiblnd_conn_decref(conn);
2969
2970                         spin_lock_irqsave (&kiblnd_data.kib_connd_lock, flags);
2971                 }
2972
2973                 /* careful with the jiffy wrap... */
2974                 timeout = (int)(deadline - jiffies);
2975                 if (timeout <= 0) {
2976                         const int n = 4;
2977                         const int p = 1;
2978                         int       chunk = kiblnd_data.kib_peer_hash_size;
2979
2980                         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
2981                         dropped_lock = 1;
2982
2983                         /* Time to check for RDMA timeouts on a few more
2984                          * peers: I do checks every 'p' seconds on a
2985                          * proportion of the peer table and I need to check
2986                          * every connection 'n' times within a timeout
2987                          * interval, to ensure I detect a timeout on any
2988                          * connection within (n+1)/n times the timeout
2989                          * interval. */
2990
2991                         if (*kiblnd_tunables.kib_timeout > n * p)
2992                                 chunk = (chunk * n * p) /
2993                                         *kiblnd_tunables.kib_timeout;
2994                         if (chunk == 0)
2995                                 chunk = 1;
2996
2997                         for (i = 0; i < chunk; i++) {
2998                                 kiblnd_check_conns(peer_index);
2999                                 peer_index = (peer_index + 1) %
3000                                              kiblnd_data.kib_peer_hash_size;
3001                         }
3002
3003                         deadline += p * HZ;
3004                         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3005                 }
3006
3007                 if (dropped_lock)
3008                         continue;
3009
3010                 /* Nothing to do for 'timeout'  */
3011                 set_current_state (TASK_INTERRUPTIBLE);
3012                 add_wait_queue (&kiblnd_data.kib_connd_waitq, &wait);
3013                 spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
3014
3015                 schedule_timeout (timeout);
3016
3017                 set_current_state (TASK_RUNNING);
3018                 remove_wait_queue (&kiblnd_data.kib_connd_waitq, &wait);
3019                 spin_lock_irqsave (&kiblnd_data.kib_connd_lock, flags);
3020         }
3021
3022         spin_unlock_irqrestore (&kiblnd_data.kib_connd_lock, flags);
3023
3024         kiblnd_thread_fini();
3025         return (0);
3026 }
3027
3028 void
3029 kiblnd_qp_event(struct ib_event *event, void *arg)
3030 {
3031         kib_conn_t *conn = arg;
3032
3033         switch (event->event) {
3034         case IB_EVENT_COMM_EST:
3035                 CDEBUG(D_NET, "%s established\n",
3036                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3037                 return;
3038                 
3039         default:
3040                 CERROR("%s: Async QP event type %d\n",
3041                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3042                 return;
3043         }
3044 }
3045
3046 void
3047 kiblnd_complete (struct ib_wc *wc)
3048 {
3049         switch (kiblnd_wreqid2type(wc->wr_id)) {
3050         default:
3051                 LBUG();
3052
3053         case IBLND_WID_RDMA:
3054                 /* We only get RDMA completion notification if it fails.  All
3055                  * subsequent work items, including the final SEND will fail
3056                  * too.  However we can't print out any more info about the
3057                  * failing RDMA because 'tx' might be back on the idle list or
3058                  * even reused already if we didn't manage to post all our work
3059                  * items */
3060                 CDEBUG(D_NETERROR, "RDMA (tx: %p) failed: %d\n",
3061                        kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3062                 return;
3063
3064         case IBLND_WID_TX:
3065                 kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3066                 return;
3067
3068         case IBLND_WID_RX:
3069                 kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3070                                    wc->byte_len);
3071                 return;
3072         }
3073 }
3074
3075 void
3076 kiblnd_cq_completion (struct ib_cq *cq, void *arg)
3077 {
3078         /* NB I'm not allowed to schedule this conn once its refcount has
3079          * reached 0.  Since fundamentally I'm racing with scheduler threads
3080          * consuming my CQ I could be called after all completions have
3081          * occurred.  But in this case, ibc_nrx == 0 && ibc_nsends_posted == 0
3082          * and this CQ is about to be destroyed so I NOOP. */
3083         kib_conn_t     *conn = (kib_conn_t *)arg;
3084         unsigned long   flags;
3085         
3086         LASSERT (cq == conn->ibc_cq);
3087
3088         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3089
3090         conn->ibc_ready = 1;
3091
3092         if (!conn->ibc_scheduled &&
3093             (conn->ibc_nrx > 0 ||
3094              conn->ibc_nsends_posted > 0)) {
3095                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3096                 conn->ibc_scheduled = 1;
3097                 list_add_tail(&conn->ibc_sched_list,
3098                               &kiblnd_data.kib_sched_conns);
3099                 wake_up(&kiblnd_data.kib_sched_waitq);
3100         }
3101
3102         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3103 }
3104
3105 void
3106 kiblnd_cq_event(struct ib_event *event, void *arg)
3107 {
3108         kib_conn_t *conn = arg;
3109
3110         CERROR("%s: async CQ event type %d\n",
3111                libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3112 }
3113
3114 int
3115 kiblnd_scheduler(void *arg)
3116 {
3117         long            id = (long)arg;
3118         wait_queue_t    wait;
3119         char            name[16];
3120         unsigned long   flags;
3121         kib_conn_t     *conn;
3122         struct ib_wc    wc;
3123         int             rc;
3124         int             did_something;
3125         int             busy_loops = 0;
3126
3127         snprintf(name, sizeof(name), "kiblnd_sd_%02ld", id);
3128         cfs_daemonize(name);
3129         cfs_block_allsigs();
3130
3131         init_waitqueue_entry(&wait, current);
3132
3133         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3134
3135         while (!kiblnd_data.kib_shutdown) {
3136                 if (busy_loops++ >= IBLND_RESCHED) {
3137                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3138                                                flags);
3139
3140                         our_cond_resched();
3141                         busy_loops = 0;
3142
3143                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3144                 }
3145
3146                 did_something = 0;
3147
3148                 if (!list_empty(&kiblnd_data.kib_sched_conns)) {
3149                         conn = list_entry(kiblnd_data.kib_sched_conns.next,
3150                                           kib_conn_t, ibc_sched_list);
3151                         /* take over kib_sched_conns' ref on conn... */
3152                         LASSERT(conn->ibc_scheduled);
3153                         list_del(&conn->ibc_sched_list);
3154                         conn->ibc_ready = 0;
3155                         
3156                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3157                                                flags);
3158
3159                         rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3160                         if (rc == 0) {
3161                                 rc = ib_req_notify_cq(conn->ibc_cq,
3162                                                       IB_CQ_NEXT_COMP);
3163                                 if (rc < 0) {
3164                                         CWARN("%s: ib_req_notify_cq failed: %d, "
3165                                               "closing connection\n",
3166                                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3167                                         kiblnd_close_conn(conn, -EIO);
3168                                         kiblnd_conn_decref(conn);
3169                                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3170                                         continue;
3171                                 }
3172
3173                                 rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3174                         }
3175
3176                         if (rc < 0) {
3177                                 CWARN("%s: ib_poll_cq failed: %d, "
3178                                       "closing connection\n",
3179                                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3180                                 kiblnd_close_conn(conn, -EIO);
3181                                 kiblnd_conn_decref(conn);
3182                                 spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3183                                 continue;
3184                         }
3185
3186                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock,
3187                                           flags);
3188
3189                         if (rc != 0 || conn->ibc_ready) {
3190                                 /* There may be another completion waiting; get
3191                                  * another scheduler to check while I handle
3192                                  * this one... */
3193                                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3194                                 list_add_tail(&conn->ibc_sched_list,
3195                                               &kiblnd_data.kib_sched_conns);
3196                                 wake_up(&kiblnd_data.kib_sched_waitq);
3197                         } else {
3198                                 conn->ibc_scheduled = 0;
3199                         }
3200                         
3201                         if (rc != 0) {
3202                                 spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
3203                                                        flags);
3204
3205                                 kiblnd_complete(&wc);
3206
3207                                 spin_lock_irqsave(&kiblnd_data.kib_sched_lock,
3208                                                   flags);
3209                         }
3210
3211                         kiblnd_conn_decref(conn); /* ...drop my ref from above */
3212                         did_something = 1;
3213                 }
3214
3215                 if (did_something)
3216                         continue;
3217
3218                 set_current_state(TASK_INTERRUPTIBLE);
3219                 add_wait_queue_exclusive(&kiblnd_data.kib_sched_waitq, &wait);
3220                 spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3221
3222                 schedule();
3223                 busy_loops = 0;
3224
3225                 remove_wait_queue(&kiblnd_data.kib_sched_waitq, &wait);
3226                 set_current_state(TASK_RUNNING);
3227                 spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
3228         }
3229
3230         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);
3231
3232         kiblnd_thread_fini();
3233         return (0);
3234 }