Whamcloud - gitweb
Mass conversion of all copyright messages to Oracle.
[fs/lustre-release.git] / lnet / klnds / iiblnd / iiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/iiblnd/iiblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "iiblnd.h"
42
43 void
44 hexdump(char *string, void *ptr, int len)
45 {
46         unsigned char *c = ptr;
47         int i;
48
49         return;
50
51         if (len < 0 || len > 2048)  {
52                 printk("XXX what the hell? %d\n",len);
53                 return;
54         }
55
56         printk("%d bytes of '%s' from 0x%p\n", len, string, ptr);
57
58         for (i = 0; i < len;) {
59                 printk("%02x",*(c++));
60                 i++;
61                 if (!(i & 15)) {
62                         printk("\n");
63                 } else if (!(i&1)) {
64                         printk(" ");
65                 }
66         }
67
68         if(len & 15) {
69                 printk("\n");
70         }
71 }
72
73 void
74 kibnal_tx_done (kib_tx_t *tx)
75 {
76         lnet_msg_t *lntmsg[2];
77         int         rc = tx->tx_status;
78         int         i;
79
80         LASSERT (!in_interrupt());
81         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
82         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
83         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
84
85 #if IBNAL_USE_FMR
86         /* Handle unmapping if required */
87 #endif
88         /* tx may have up to 2 lnet msgs to finalise */
89         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
90         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
91         
92         if (tx->tx_conn != NULL) {
93                 kibnal_conn_decref(tx->tx_conn);
94                 tx->tx_conn = NULL;
95         }
96
97         tx->tx_nwrq = 0;
98         tx->tx_status = 0;
99
100         spin_lock(&kibnal_data.kib_tx_lock);
101
102         list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
103
104         spin_unlock(&kibnal_data.kib_tx_lock);
105
106         /* delay finalize until my descs have been freed */
107         for (i = 0; i < 2; i++) {
108                 if (lntmsg[i] == NULL)
109                         continue;
110
111                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
112         }
113 }
114
115 kib_tx_t *
116 kibnal_get_idle_tx (void) 
117 {
118         kib_tx_t      *tx;
119         
120         spin_lock(&kibnal_data.kib_tx_lock);
121
122         if (list_empty (&kibnal_data.kib_idle_txs)) {
123                 spin_unlock(&kibnal_data.kib_tx_lock);
124                 return NULL;
125         }
126
127         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
128         list_del (&tx->tx_list);
129
130         /* Allocate a new completion cookie.  It might not be needed,
131          * but we've got a lock right now and we're unlikely to
132          * wrap... */
133         tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
134
135         spin_unlock(&kibnal_data.kib_tx_lock);
136
137         LASSERT (tx->tx_nwrq == 0);
138         LASSERT (!tx->tx_queued);
139         LASSERT (tx->tx_sending == 0);
140         LASSERT (!tx->tx_waiting);
141         LASSERT (tx->tx_status == 0);
142         LASSERT (tx->tx_conn == NULL);
143         LASSERT (tx->tx_lntmsg[0] == NULL);
144         LASSERT (tx->tx_lntmsg[1] == NULL);
145         
146         return tx;
147 }
148
149 int
150 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
151 {
152         kib_conn_t   *conn = rx->rx_conn;
153         int           rc = 0;
154         FSTATUS       frc;
155
156         LASSERT (!in_interrupt());
157         /* old peers don't reserve rxs for RDMA replies */
158         LASSERT (!rsrvd_credit ||
159                  conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
160         
161         rx->rx_gl = (IB_LOCAL_DATASEGMENT) {
162                 .Address = rx->rx_hca_msg,
163                 .Lkey    = kibnal_data.kib_whole_mem.md_lkey,
164                 .Length  = IBNAL_MSG_SIZE,
165         };
166
167         rx->rx_wrq = (IB_WORK_REQ2) {
168                 .Next          = NULL,
169                 .WorkReqId     = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
170                 .MessageLen    = IBNAL_MSG_SIZE,
171                 .DSList        = &rx->rx_gl,
172                 .DSListDepth   = 1,
173                 .Operation     = WROpRecv,
174         };
175
176         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
177         LASSERT (rx->rx_nob >= 0);              /* not posted */
178
179         CDEBUG(D_NET, "posting rx [%d %x "LPX64"]\n", 
180                rx->rx_wrq.DSList->Length,
181                rx->rx_wrq.DSList->Lkey,
182                rx->rx_wrq.DSList->Address);
183
184         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
185                 /* No more posts for this rx; so lose its ref */
186                 kibnal_conn_decref(conn);
187                 return 0;
188         }
189         
190         rx->rx_nob = -1;                        /* flag posted */
191         mb();
192
193         frc = iba_post_recv2(conn->ibc_qp, &rx->rx_wrq, NULL);
194         if (frc == FSUCCESS) {
195                 if (credit || rsrvd_credit) {
196                         spin_lock(&conn->ibc_lock);
197
198                         if (credit)
199                                 conn->ibc_outstanding_credits++;
200                         if (rsrvd_credit)
201                                 conn->ibc_reserved_credits++;
202
203                         spin_unlock(&conn->ibc_lock);
204
205                         kibnal_check_sends(conn);
206                 }
207                 return 0;
208         }
209         
210         CERROR ("post rx -> %s failed %d\n", 
211                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
212         rc = -EIO;
213         kibnal_close_conn(rx->rx_conn, rc);
214         /* No more posts for this rx; so lose its ref */
215         kibnal_conn_decref(conn);
216         return rc;
217 }
218
219 int
220 kibnal_post_receives (kib_conn_t *conn)
221 {
222         int    i;
223         int    rc;
224
225         LASSERT (conn->ibc_state == IBNAL_CONN_CONNECTING);
226
227         for (i = 0; i < IBNAL_RX_MSGS; i++) {
228                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
229                  * fails (i.e. actual failure or we're disconnecting) */
230                 kibnal_conn_addref(conn);
231                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
232                 if (rc != 0)
233                         return rc;
234         }
235
236         return 0;
237 }
238
239 kib_tx_t *
240 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
241 {
242         struct list_head   *tmp;
243         
244         list_for_each(tmp, &conn->ibc_active_txs) {
245                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
246                 
247                 LASSERT (!tx->tx_queued);
248                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
249
250                 if (tx->tx_cookie != cookie)
251                         continue;
252
253                 if (tx->tx_waiting &&
254                     tx->tx_msg->ibm_type == txtype)
255                         return tx;
256
257                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
258                       tx->tx_waiting ? "" : "NOT ",
259                       tx->tx_msg->ibm_type, txtype);
260         }
261         return NULL;
262 }
263
264 void
265 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
266 {
267         kib_tx_t    *tx;
268         int          idle;
269
270         spin_lock(&conn->ibc_lock);
271
272         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
273         if (tx == NULL) {
274                 spin_unlock(&conn->ibc_lock);
275
276                 CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
277                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
278                 kibnal_close_conn (conn, -EPROTO);
279                 return;
280         }
281
282         if (tx->tx_status == 0) {               /* success so far */
283                 if (status < 0) {               /* failed? */
284                         tx->tx_status = status;
285                 } else if (txtype == IBNAL_MSG_GET_REQ) {
286                         lnet_set_reply_msg_len(kibnal_data.kib_ni,
287                                                tx->tx_lntmsg[1], status);
288                 }
289         }
290         
291         tx->tx_waiting = 0;
292
293         idle = !tx->tx_queued && (tx->tx_sending == 0);
294         if (idle)
295                 list_del(&tx->tx_list);
296
297         spin_unlock(&conn->ibc_lock);
298         
299         if (idle)
300                 kibnal_tx_done(tx);
301 }
302
303 void
304 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
305 {
306         kib_tx_t    *tx = kibnal_get_idle_tx();
307         
308         if (tx == NULL) {
309                 CERROR("Can't get tx for completion %x for %s\n",
310                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
311                 return;
312         }
313         
314         tx->tx_msg->ibm_u.completion.ibcm_status = status;
315         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
316         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
317         
318         kibnal_queue_tx(tx, conn);
319 }
320
321 void
322 kibnal_handle_rx (kib_rx_t *rx)
323 {
324         kib_msg_t    *msg = rx->rx_msg;
325         kib_conn_t   *conn = rx->rx_conn;
326         int           credits = msg->ibm_credits;
327         kib_tx_t     *tx;
328         int           rc = 0;
329         int           repost = 1;
330         int           rsrvd_credit = 0;
331         int           rc2;
332
333         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
334
335         CDEBUG (D_NET, "Received %x[%d] from %s\n",
336                 msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));
337         
338         if (credits != 0) {
339                 /* Have I received credits that will let me send? */
340                 spin_lock(&conn->ibc_lock);
341                 conn->ibc_credits += credits;
342                 spin_unlock(&conn->ibc_lock);
343
344                 kibnal_check_sends(conn);
345         }
346
347         switch (msg->ibm_type) {
348         default:
349                 CERROR("Bad IBNAL message type %x from %s\n",
350                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
351                 rc = -EPROTO;
352                 break;
353
354         case IBNAL_MSG_NOOP:
355                 break;
356
357         case IBNAL_MSG_IMMEDIATE:
358                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
359                                 msg->ibm_srcnid, rx, 0);
360                 repost = rc < 0;                /* repost on error */
361                 break;
362                 
363         case IBNAL_MSG_PUT_REQ:
364                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
365                                 msg->ibm_srcnid, rx, 1);
366                 repost = rc < 0;                /* repost on error */
367                 break;
368
369         case IBNAL_MSG_PUT_NAK:
370                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
371
372                 CWARN ("PUT_NACK from %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid));
373                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
374                                          msg->ibm_u.completion.ibcm_status,
375                                          msg->ibm_u.completion.ibcm_cookie);
376                 break;
377
378         case IBNAL_MSG_PUT_ACK:
379                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
380
381                 spin_lock(&conn->ibc_lock);
382                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
383                                                    msg->ibm_u.putack.ibpam_src_cookie);
384                 if (tx != NULL)
385                         list_del(&tx->tx_list);
386                 spin_unlock(&conn->ibc_lock);
387
388                 if (tx == NULL) {
389                         CERROR("Unmatched PUT_ACK from %s\n",
390                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
391                         rc = -EPROTO;
392                         break;
393                 }
394
395                 LASSERT (tx->tx_waiting);
396                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
397                  * (a) I can overwrite tx_msg since my peer has received it!
398                  * (b) tx_waiting set tells tx_complete() it's not done. */
399
400                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
401
402                 rc2 = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
403                                        kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
404                                        &msg->ibm_u.putack.ibpam_rd,
405                                        msg->ibm_u.putack.ibpam_dst_cookie);
406                 if (rc2 < 0)
407                         CERROR("Can't setup rdma for PUT to %s: %d\n",
408                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
409
410                 spin_lock(&conn->ibc_lock);
411                 if (tx->tx_status == 0 && rc2 < 0)
412                         tx->tx_status = rc2;
413                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
414                 kibnal_queue_tx_locked(tx, conn);
415                 spin_unlock(&conn->ibc_lock);
416                 break;
417                 
418         case IBNAL_MSG_PUT_DONE:
419                 /* This buffer was pre-reserved by not returning the credit
420                  * when the PUT_REQ's buffer was reposted, so I just return it
421                  * now */
422                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
423                                          msg->ibm_u.completion.ibcm_status,
424                                          msg->ibm_u.completion.ibcm_cookie);
425                 break;
426
427         case IBNAL_MSG_GET_REQ:
428                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
429                                 msg->ibm_srcnid, rx, 1);
430                 repost = rc < 0;                /* repost on error */
431                 break;
432
433         case IBNAL_MSG_GET_DONE:
434                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
435
436                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
437                                          msg->ibm_u.completion.ibcm_status,
438                                          msg->ibm_u.completion.ibcm_cookie);
439                 break;
440         }
441
442         if (rc < 0)                             /* protocol error */
443                 kibnal_close_conn(conn, rc);
444
445         if (repost) {
446                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
447                         rsrvd_credit = 0;       /* peer isn't pre-reserving */
448
449                 kibnal_post_rx(rx, !rsrvd_credit, rsrvd_credit);
450         }
451 }
452
453 void
454 kibnal_rx_complete (IB_WORK_COMPLETION *wc, __u64 rxseq)
455 {
456         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
457         int           nob = wc->Length;
458         kib_msg_t    *msg = rx->rx_msg;
459         kib_conn_t   *conn = rx->rx_conn;
460         unsigned long flags;
461         int           rc;
462         int           err = -EIO;
463
464         LASSERT (rx->rx_nob < 0);               /* was posted */
465         rx->rx_nob = 0;                         /* isn't now */
466         mb();
467
468         /* receives complete with error in any case after we've started
469          * disconnecting */
470         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
471                 goto ignore;
472
473         if (wc->Status != WRStatusSuccess) {
474                 CERROR("Rx from %s failed: %d\n", 
475                        libcfs_nid2str(conn->ibc_peer->ibp_nid), wc->Status);
476                 goto failed;
477         }
478
479         rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
480         if (rc != 0) {
481                 CERROR ("Error %d unpacking rx from %s\n",
482                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
483                 goto failed;
484         }
485
486         rx->rx_nob = nob;                       /* Now I know nob > 0 */
487         mb();
488
489         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
490             msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid ||
491             msg->ibm_srcstamp != conn->ibc_incarnation ||
492             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
493                 CERROR ("Stale rx from %s\n",
494                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
495                 err = -ESTALE;
496                 goto failed;
497         }
498
499         if (msg->ibm_seq != rxseq) {
500                 CERROR ("Out-of-sequence rx from %s"
501                         ": got "LPD64" but expected "LPD64"\n",
502                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
503                         msg->ibm_seq, rxseq);
504                 goto failed;
505         }
506
507         /* set time last known alive */
508         kibnal_peer_alive(conn->ibc_peer);
509
510         /* racing with connection establishment/teardown! */
511
512         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
513                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
514                 /* must check holding global lock to eliminate race */
515                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
516                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
517                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
518                                                 flags);
519                         return;
520                 }
521                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
522                                         flags);
523         }
524         kibnal_handle_rx(rx);
525         return;
526         
527  failed:
528         kibnal_close_conn(conn, err);
529  ignore:
530         /* Don't re-post rx & drop its ref on conn */
531         kibnal_conn_decref(conn);
532 }
533
534 struct page *
535 kibnal_kvaddr_to_page (unsigned long vaddr)
536 {
537         struct page *page;
538
539         if (vaddr >= VMALLOC_START &&
540             vaddr < VMALLOC_END) {
541                 page = vmalloc_to_page ((void *)vaddr);
542                 LASSERT (page != NULL);
543                 return page;
544         }
545 #ifdef CONFIG_HIGHMEM
546         if (vaddr >= PKMAP_BASE &&
547             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
548                 /* No highmem pages only used for bulk (kiov) I/O */
549                 CERROR("find page for address in highmem\n");
550                 LBUG();
551         }
552 #endif
553         page = virt_to_page (vaddr);
554         LASSERT (page != NULL);
555         return page;
556 }
557
558 #if !IBNAL_USE_FMR
559 int
560 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
561                      unsigned long page_offset, unsigned long len)
562 {
563         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
564
565         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
566                 CERROR ("Too many RDMA fragments\n");
567                 return -EMSGSIZE;
568         }
569
570         if (active) {
571                 if (rd->rd_nfrag == 0)
572                         rd->rd_key = kibnal_data.kib_whole_mem.md_lkey;
573         } else {
574                 if (rd->rd_nfrag == 0)
575                         rd->rd_key = kibnal_data.kib_whole_mem.md_rkey;
576         }
577
578         frag->rf_nob  = len;
579         frag->rf_addr = kibnal_data.kib_whole_mem.md_addr +
580                         lnet_page2phys(page) + page_offset;
581
582         CDEBUG(D_NET,"map key %x frag [%d]["LPX64" for %d]\n", 
583                rd->rd_key, rd->rd_nfrag, frag->rf_addr, frag->rf_nob);
584
585         rd->rd_nfrag++;
586         return 0;
587 }
588
589 int
590 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
591                     unsigned int niov, struct iovec *iov, int offset, int nob)
592                  
593 {
594         int           fragnob;
595         int           rc;
596         unsigned long vaddr;
597         struct page  *page;
598         int           page_offset;
599
600         LASSERT (nob > 0);
601         LASSERT (niov > 0);
602         LASSERT ((rd != tx->tx_rd) == !active);
603
604         while (offset >= iov->iov_len) {
605                 offset -= iov->iov_len;
606                 niov--;
607                 iov++;
608                 LASSERT (niov > 0);
609         }
610
611         rd->rd_nfrag = 0;
612         do {
613                 LASSERT (niov > 0);
614
615                 vaddr = ((unsigned long)iov->iov_base) + offset;
616                 page_offset = vaddr & (PAGE_SIZE - 1);
617                 page = kibnal_kvaddr_to_page(vaddr);
618                 if (page == NULL) {
619                         CERROR ("Can't find page\n");
620                         return -EFAULT;
621                 }
622
623                 fragnob = min((int)(iov->iov_len - offset), nob);
624                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
625
626                 rc = kibnal_append_rdfrag(rd, active, page, 
627                                           page_offset, fragnob);
628                 if (rc != 0)
629                         return rc;
630
631                 if (offset + fragnob < iov->iov_len) {
632                         offset += fragnob;
633                 } else {
634                         offset = 0;
635                         iov++;
636                         niov--;
637                 }
638                 nob -= fragnob;
639         } while (nob > 0);
640         
641         return 0;
642 }
643
644 int
645 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
646                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
647 {
648         int            fragnob;
649         int            rc;
650
651         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
652
653         LASSERT (nob > 0);
654         LASSERT (nkiov > 0);
655         LASSERT ((rd != tx->tx_rd) == !active);
656
657         while (offset >= kiov->kiov_len) {
658                 offset -= kiov->kiov_len;
659                 nkiov--;
660                 kiov++;
661                 LASSERT (nkiov > 0);
662         }
663
664         rd->rd_nfrag = 0;
665         do {
666                 LASSERT (nkiov > 0);
667                 fragnob = min((int)(kiov->kiov_len - offset), nob);
668                 
669                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
670                                           kiov->kiov_offset + offset,
671                                           fragnob);
672                 if (rc != 0)
673                         return rc;
674
675                 offset = 0;
676                 kiov++;
677                 nkiov--;
678                 nob -= fragnob;
679         } while (nob > 0);
680
681         return 0;
682 }
683 #else
684 int
685 kibnal_map_tx (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
686                int npages, unsigned long page_offset, int nob)
687 {
688         IB_ACCESS_CONTROL access = {0,};
689         FSTATUS           frc;
690
691         LASSERT ((rd != tx->tx_rd) == !active);
692         LASSERT (!tx->tx_md.md_active);
693         LASSERT (tx->tx_md.md_fmrcount > 0);
694         LASSERT (page_offset < PAGE_SIZE);
695         LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
696         LASSERT (npages <= LNET_MAX_IOV);
697
698         if (!active) {
699                 // access.s.MWBindable = 1;
700                 access.s.LocalWrite = 1;
701                 access.s.RdmaWrite = 1;
702         }
703
704         /* Map the memory described by tx->tx_pages
705         frc = iibt_register_physical_memory(kibnal_data.kib_hca,
706                                             IBNAL_RDMA_BASE,
707                                             tx->tx_pages, npages,
708                                             page_offset,
709                                             kibnal_data.kib_pd,
710                                             access,
711                                             &tx->tx_md.md_handle,
712                                             &tx->tx_md.md_addr,
713                                             &tx->tx_md.md_lkey,
714                                             &tx->tx_md.md_rkey);
715         */
716         return -EINVAL;
717 }
718
719 int
720 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
721                      unsigned int niov, struct iovec *iov, int offset, int nob)
722                  
723 {
724         int           resid;
725         int           fragnob;
726         struct page  *page;
727         int           npages;
728         unsigned long page_offset;
729         unsigned long vaddr;
730
731         LASSERT (nob > 0);
732         LASSERT (niov > 0);
733
734         while (offset >= iov->iov_len) {
735                 offset -= iov->iov_len;
736                 niov--;
737                 iov++;
738                 LASSERT (niov > 0);
739         }
740
741         if (nob > iov->iov_len - offset) {
742                 CERROR ("Can't map multiple vaddr fragments\n");
743                 return (-EMSGSIZE);
744         }
745
746         vaddr = ((unsigned long)iov->iov_base) + offset;
747         
748         page_offset = vaddr & (PAGE_SIZE - 1);
749         resid = nob;
750         npages = 0;
751
752         do {
753                 LASSERT (npages < LNET_MAX_IOV);
754
755                 page = kibnal_kvaddr_to_page(vaddr);
756                 if (page == NULL) {
757                         CERROR("Can't find page for %lu\n", vaddr);
758                         return -EFAULT;
759                 }
760
761                 tx->tx_pages[npages++] = lnet_page2phys(page);
762
763                 fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
764                 vaddr += fragnob;
765                 resid -= fragnob;
766
767         } while (resid > 0);
768
769         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
770 }
771
772 int
773 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
774                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
775 {
776         int            resid;
777         int            npages;
778         unsigned long  page_offset;
779         
780         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
781
782         LASSERT (nob > 0);
783         LASSERT (nkiov > 0);
784         LASSERT (nkiov <= LNET_MAX_IOV);
785         LASSERT (!tx->tx_md.md_active);
786         LASSERT ((rd != tx->tx_rd) == !active);
787
788         while (offset >= kiov->kiov_len) {
789                 offset -= kiov->kiov_len;
790                 nkiov--;
791                 kiov++;
792                 LASSERT (nkiov > 0);
793         }
794
795         page_offset = kiov->kiov_offset + offset;
796         
797         resid = offset + nob;
798         npages = 0;
799
800         do {
801                 LASSERT (npages < LNET_MAX_IOV);
802                 LASSERT (nkiov > 0);
803
804                 if ((npages > 0 && kiov->kiov_offset != 0) ||
805                     (resid > kiov->kiov_len && 
806                      (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
807                         /* Can't have gaps */
808                         CERROR ("Can't make payload contiguous in I/O VM:"
809                                 "page %d, offset %d, len %d \n",
810                                 npages, kiov->kiov_offset, kiov->kiov_len);
811                         
812                         return -EINVAL;
813                 }
814
815                 tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);
816                 resid -= kiov->kiov_len;
817                 kiov++;
818                 nkiov--;
819         } while (resid > 0);
820
821         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
822 }
823 #endif
824
825 kib_conn_t *
826 kibnal_find_conn_locked (kib_peer_t *peer)
827 {
828         struct list_head *tmp;
829
830         /* just return the first connection */
831         list_for_each (tmp, &peer->ibp_conns) {
832                 return (list_entry(tmp, kib_conn_t, ibc_list));
833         }
834
835         return (NULL);
836 }
837
838 void
839 kibnal_check_sends (kib_conn_t *conn)
840 {
841         kib_tx_t       *tx;
842         FSTATUS         frc;
843         int             rc;
844         int             consume_cred;
845         int             done;
846
847         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
848         
849         spin_lock(&conn->ibc_lock);
850
851         LASSERT (conn->ibc_nsends_posted <=
852                 *kibnal_tunables.kib_concurrent_sends);
853         LASSERT (conn->ibc_reserved_credits >= 0);
854         
855         while (conn->ibc_reserved_credits > 0 &&
856                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
857                 LASSERT (conn->ibc_version != 
858                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
859                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
860                                 kib_tx_t, tx_list);
861                 list_del(&tx->tx_list);
862                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
863                 conn->ibc_reserved_credits--;
864         }
865
866         if (list_empty(&conn->ibc_tx_queue) &&
867             list_empty(&conn->ibc_tx_queue_nocred) &&
868             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
869              kibnal_send_keepalive(conn))) {
870                 spin_unlock(&conn->ibc_lock);
871                 
872                 tx = kibnal_get_idle_tx();
873                 if (tx != NULL)
874                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
875
876                 spin_lock(&conn->ibc_lock);
877                 
878                 if (tx != NULL)
879                         kibnal_queue_tx_locked(tx, conn);
880         }
881
882         for (;;) {
883                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
884                         LASSERT (conn->ibc_version != 
885                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
886                         tx = list_entry (conn->ibc_tx_queue_nocred.next, 
887                                          kib_tx_t, tx_list);
888                         consume_cred = 0;
889                 } else if (!list_empty (&conn->ibc_tx_queue)) {
890                         tx = list_entry (conn->ibc_tx_queue.next, 
891                                          kib_tx_t, tx_list);
892                         consume_cred = 1;
893                 } else {
894                         /* nothing waiting */
895                         break;
896                 }
897
898                 LASSERT (tx->tx_queued);
899                 /* We rely on this for QP sizing */
900                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
901
902                 LASSERT (conn->ibc_outstanding_credits >= 0);
903                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
904                 LASSERT (conn->ibc_credits >= 0);
905                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
906
907                 if (conn->ibc_nsends_posted ==
908                     *kibnal_tunables.kib_concurrent_sends) {
909                         /* We've got some tx completions outstanding... */
910                         CDEBUG(D_NET, "%s: posted enough\n",
911                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
912                         break;
913                 }
914
915                 if (consume_cred) {
916                         if (conn->ibc_credits == 0) {   /* no credits */
917                                 CDEBUG(D_NET, "%s: no credits\n",
918                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
919                                 break;
920                         }
921                         
922                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
923                             conn->ibc_outstanding_credits == 0) { /* giving back credits */
924                                 CDEBUG(D_NET, "%s: not using last credit\n",
925                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
926                                 break;
927                         }
928                 }
929                 
930                 list_del (&tx->tx_list);
931                 tx->tx_queued = 0;
932
933                 /* NB don't drop ibc_lock before bumping tx_sending */
934
935                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
936                     (!list_empty(&conn->ibc_tx_queue) ||
937                      !list_empty(&conn->ibc_tx_queue_nocred) ||
938                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
939                       !kibnal_send_keepalive(conn)))) {
940                         /* redundant NOOP */
941                         spin_unlock(&conn->ibc_lock);
942                         kibnal_tx_done(tx);
943                         spin_lock(&conn->ibc_lock);
944                         CDEBUG(D_NET, "%s: redundant noop\n",
945                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
946                         continue;
947                 }
948
949                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
950                                 conn->ibc_outstanding_credits,
951                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
952                                 conn->ibc_txseq);
953
954                 conn->ibc_txseq++;
955                 conn->ibc_outstanding_credits = 0;
956                 conn->ibc_nsends_posted++;
957                 if (consume_cred)
958                         conn->ibc_credits--;
959
960                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
961                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
962                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
963                  * and then re-queued here.  It's (just) possible that
964                  * tx_sending is non-zero if we've not done the tx_complete() from
965                  * the first send; hence the ++ rather than = below. */
966                 tx->tx_sending++;
967
968                 list_add (&tx->tx_list, &conn->ibc_active_txs);
969
970                 LASSERT (tx->tx_nwrq > 0);
971
972                 rc = 0;
973                 frc = FSUCCESS;
974                 if (conn->ibc_state != IBNAL_CONN_ESTABLISHED) {
975                         rc = -ECONNABORTED;
976                 } else {
977                         frc = iba_post_send2(conn->ibc_qp, tx->tx_wrq, NULL);
978                         if (frc != FSUCCESS)
979                                 rc = -EIO;
980                 }
981
982                 conn->ibc_last_send = jiffies;
983
984                 if (rc != 0) {
985                         /* NB credits are transferred in the actual
986                          * message, which can only be the last work item */
987                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
988                         if (consume_cred)
989                                 conn->ibc_credits++;
990                         conn->ibc_nsends_posted--;
991
992                         tx->tx_status = rc;
993                         tx->tx_waiting = 0;
994                         tx->tx_sending--;
995                         
996                         done = (tx->tx_sending == 0);
997                         if (done)
998                                 list_del (&tx->tx_list);
999                         
1000                         spin_unlock(&conn->ibc_lock);
1001                         
1002                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1003                                 CERROR ("Error %d posting transmit to %s\n", 
1004                                         frc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1005                         else
1006                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
1007                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1008
1009                         kibnal_close_conn (conn, rc);
1010
1011                         if (done)
1012                                 kibnal_tx_done (tx);
1013                         return;
1014                 }
1015         }
1016
1017         spin_unlock(&conn->ibc_lock);
1018 }
1019
1020 void
1021 kibnal_tx_complete (IB_WORK_COMPLETION *wc)
1022 {
1023         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
1024         kib_conn_t   *conn = tx->tx_conn;
1025         int           failed = wc->Status != WRStatusSuccess;
1026         int           idle;
1027
1028         CDEBUG(D_NET, "%s: sending %d nwrq %d status %d\n", 
1029                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1030                tx->tx_sending, tx->tx_nwrq, wc->Status);
1031
1032         LASSERT (tx->tx_sending > 0);
1033
1034         if (failed &&
1035             tx->tx_status == 0 &&
1036             conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1037 #if KIBLND_DETAILED_DEBUG
1038                 int                   i;
1039                 IB_WORK_REQ2         *wrq = &tx->tx_wrq[0];
1040                 IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[0];
1041                 lnet_msg_t           *lntmsg = tx->tx_lntmsg[0];
1042 #endif
1043                 CDEBUG(D_NETERROR, "tx -> %s type %x cookie "LPX64
1044                        " sending %d waiting %d failed %d nwrk %d\n", 
1045                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
1046                        tx->tx_msg->ibm_type, tx->tx_cookie,
1047                        tx->tx_sending, tx->tx_waiting, wc->Status,
1048                        tx->tx_nwrq);
1049 #if KIBLND_DETAILED_DEBUG
1050                 for (i = 0; i < tx->tx_nwrq; i++, wrq++, gl++) {
1051                         switch (wrq->Operation) {
1052                         default:
1053                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p OP %d "
1054                                        "DSList %p(%p)/%d: "LPX64"/%d K %x\n",
1055                                        i, wrq, wrq->Next, wrq->Operation,
1056                                        wrq->DSList, gl, wrq->DSListDepth,
1057                                        gl->Address, gl->Length, gl->Lkey);
1058                                 break;
1059                         case WROpSend:
1060                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p SEND "
1061                                        "DSList %p(%p)/%d: "LPX64"/%d K %x\n",
1062                                        i, wrq, wrq->Next, 
1063                                        wrq->DSList, gl, wrq->DSListDepth,
1064                                        gl->Address, gl->Length, gl->Lkey);
1065                                 break;
1066                         case WROpRdmaWrite:
1067                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p DMA "
1068                                        "DSList: %p(%p)/%d "LPX64"/%d K %x -> "
1069                                        LPX64" K %x\n",
1070                                        i, wrq, wrq->Next, 
1071                                        wrq->DSList, gl, wrq->DSListDepth,
1072                                        gl->Address, gl->Length, gl->Lkey,
1073                                        wrq->Req.SendRC.RemoteDS.Address,
1074                                        wrq->Req.SendRC.RemoteDS.Rkey);
1075                                 break;
1076                         }
1077                 }
1078                 
1079                 switch (tx->tx_msg->ibm_type) {
1080                 default:
1081                         CDEBUG(D_NETERROR, "  msg type %x %p/%d, No RDMA\n", 
1082                                tx->tx_msg->ibm_type, 
1083                                tx->tx_msg, tx->tx_msg->ibm_nob);
1084                         break;
1085
1086                 case IBNAL_MSG_PUT_DONE:
1087                 case IBNAL_MSG_GET_DONE:
1088                         CDEBUG(D_NETERROR, "  msg type %x %p/%d, RDMA key %x frags %d...\n", 
1089                                tx->tx_msg->ibm_type, 
1090                                tx->tx_msg, tx->tx_msg->ibm_nob,
1091                                tx->tx_rd->rd_key, tx->tx_rd->rd_nfrag);
1092                         for (i = 0; i < tx->tx_rd->rd_nfrag; i++)
1093                                 CDEBUG(D_NETERROR, "    [%d] "LPX64"/%d\n", i,
1094                                        tx->tx_rd->rd_frags[i].rf_addr,
1095                                        tx->tx_rd->rd_frags[i].rf_nob);
1096                         if (lntmsg == NULL) {
1097                                 CDEBUG(D_NETERROR, "  No lntmsg\n");
1098                         } else if (lntmsg->msg_iov != NULL) {
1099                                 CDEBUG(D_NETERROR, "  lntmsg in %d VIRT frags...\n", 
1100                                        lntmsg->msg_niov);
1101                                 for (i = 0; i < lntmsg->msg_niov; i++)
1102                                         CDEBUG(D_NETERROR, "    [%d] %p/%d\n", i,
1103                                                lntmsg->msg_iov[i].iov_base,
1104                                                lntmsg->msg_iov[i].iov_len);
1105                         } else if (lntmsg->msg_kiov != NULL) {
1106                                 CDEBUG(D_NETERROR, "  lntmsg in %d PAGE frags...\n", 
1107                                        lntmsg->msg_niov);
1108                                 for (i = 0; i < lntmsg->msg_niov; i++)
1109                                         CDEBUG(D_NETERROR, "    [%d] %p+%d/%d\n", i,
1110                                                lntmsg->msg_kiov[i].kiov_page,
1111                                                lntmsg->msg_kiov[i].kiov_offset,
1112                                                lntmsg->msg_kiov[i].kiov_len);
1113                         } else {
1114                                 CDEBUG(D_NETERROR, "  lntmsg in %d frags\n", 
1115                                        lntmsg->msg_niov);
1116                         }
1117                         
1118                         break;
1119                 }
1120 #endif
1121         }
1122         
1123         spin_lock(&conn->ibc_lock);
1124
1125         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1126          * gets to free it, which also drops its ref on 'conn'. */
1127
1128         tx->tx_sending--;
1129         conn->ibc_nsends_posted--;
1130
1131         if (failed) {
1132                 tx->tx_waiting = 0;
1133                 tx->tx_status = -EIO;
1134         }
1135         
1136         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1137                !tx->tx_waiting &&               /* Not waiting for peer */
1138                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1139         if (idle)
1140                 list_del(&tx->tx_list);
1141
1142         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1143
1144         spin_unlock(&conn->ibc_lock);
1145
1146         if (idle)
1147                 kibnal_tx_done (tx);
1148
1149         if (failed) {
1150                 kibnal_close_conn (conn, -EIO);
1151         } else {
1152                 kibnal_peer_alive(conn->ibc_peer);
1153                 kibnal_check_sends(conn);
1154         }
1155
1156         kibnal_conn_decref(conn);               /* ...until here */
1157 }
1158
1159 void
1160 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1161 {
1162         IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[tx->tx_nwrq];
1163         IB_WORK_REQ2         *wrq = &tx->tx_wrq[tx->tx_nwrq];
1164         int                   nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1165
1166         LASSERT (tx->tx_nwrq >= 0 && 
1167                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1168         LASSERT (nob <= IBNAL_MSG_SIZE);
1169
1170         kibnal_init_msg(tx->tx_msg, type, body_nob);
1171
1172         *gl = (IB_LOCAL_DATASEGMENT) {
1173                 .Address = tx->tx_hca_msg,
1174                 .Length  = IBNAL_MSG_SIZE,
1175                 .Lkey    = kibnal_data.kib_whole_mem.md_lkey,
1176         };
1177
1178         wrq->Next           = NULL;             /* This is the last one */
1179
1180         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1181         wrq->Operation      = WROpSend;
1182         wrq->DSList         = gl;
1183         wrq->DSListDepth    = 1;
1184         wrq->MessageLen     = nob;
1185         wrq->Req.SendRC.ImmediateData  = 0;
1186         wrq->Req.SendRC.Options.s.SolicitedEvent         = 1;
1187         wrq->Req.SendRC.Options.s.SignaledCompletion     = 1;
1188         wrq->Req.SendRC.Options.s.ImmediateData          = 0;
1189         wrq->Req.SendRC.Options.s.Fence                  = 0; 
1190         /* fence only needed on RDMA reads */
1191         
1192         tx->tx_nwrq++;
1193 }
1194
1195 int
1196 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1197                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1198 {
1199         kib_msg_t            *ibmsg = tx->tx_msg;
1200         kib_rdma_desc_t      *srcrd = tx->tx_rd;
1201         IB_LOCAL_DATASEGMENT *gl;
1202         IB_WORK_REQ2         *wrq;
1203         int                   rc;
1204
1205 #if IBNAL_USE_FMR
1206         LASSERT (tx->tx_nwrq == 0);
1207
1208         gl = &tx->tx_gl[0];
1209         gl->Length  = nob;
1210         gl->Address = srcrd->rd_addr;
1211         gl->Lkey    = srcrd->rd_key;
1212
1213         wrq = &tx->tx_wrq[0];
1214
1215         wrq->Next           = wrq + 1;
1216         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1217         wrq->Operation      = WROpRdmaWrite;
1218         wrq->DSList         = gl;
1219         wrq->DSListDepth    = 1;
1220         wrq->MessageLen     = nob;
1221
1222         wrq->Req.SendRC.ImmediateData                = 0;
1223         wrq->Req.SendRC.Options.s.SolicitedEvent     = 0;
1224         wrq->Req.SendRC.Options.s.SignaledCompletion = 0;
1225         wrq->Req.SendRC.Options.s.ImmediateData      = 0;
1226         wrq->Req.SendRC.Options.s.Fence              = 0; 
1227
1228         wrq->Req.SendRC.RemoteDS.Address = dstrd->rd_addr;
1229         wrq->Req.SendRC.RemoteDS.Rkey    = dstrd->rd_key;
1230
1231         tx->tx_nwrq = 1;
1232         rc = nob;
1233 #else
1234         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1235         int              resid = nob;
1236         kib_rdma_frag_t *srcfrag;
1237         int              srcidx;
1238         kib_rdma_frag_t *dstfrag;
1239         int              dstidx;
1240         int              wrknob;
1241
1242         /* Called by scheduler */
1243         LASSERT (!in_interrupt());
1244
1245         LASSERT (type == IBNAL_MSG_GET_DONE ||
1246                  type == IBNAL_MSG_PUT_DONE);
1247
1248         srcidx = dstidx = 0;
1249         srcfrag = &srcrd->rd_frags[0];
1250         dstfrag = &dstrd->rd_frags[0];
1251         rc = resid;
1252
1253         while (resid > 0) {
1254                 if (srcidx >= srcrd->rd_nfrag) {
1255                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1256                         rc = -EPROTO;
1257                         break;
1258                 }
1259                 
1260                 if (dstidx == dstrd->rd_nfrag) {
1261                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1262                         rc = -EPROTO;
1263                         break;
1264                 }
1265
1266                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1267                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1268                                srcidx, srcrd->rd_nfrag,
1269                                dstidx, dstrd->rd_nfrag);
1270                         rc = -EMSGSIZE;
1271                         break;
1272                 }
1273
1274                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1275
1276                 gl = &tx->tx_gl[tx->tx_nwrq];
1277                 gl->Length  = wrknob;
1278                 gl->Address = srcfrag->rf_addr;
1279                 gl->Lkey    = srcrd->rd_key;
1280
1281                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1282
1283                 wrq->Next           = wrq + 1;
1284                 wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1285                 wrq->Operation      = WROpRdmaWrite;
1286                 wrq->DSList         = gl;
1287                 wrq->DSListDepth    = 1;
1288                 wrq->MessageLen     = nob;
1289
1290                 wrq->Req.SendRC.ImmediateData                = 0;
1291                 wrq->Req.SendRC.Options.s.SolicitedEvent     = 0;
1292                 wrq->Req.SendRC.Options.s.SignaledCompletion = 0;
1293                 wrq->Req.SendRC.Options.s.ImmediateData      = 0;
1294                 wrq->Req.SendRC.Options.s.Fence              = 0; 
1295
1296                 wrq->Req.SendRC.RemoteDS.Address = dstfrag->rf_addr;
1297                 wrq->Req.SendRC.RemoteDS.Rkey    = dstrd->rd_key;
1298
1299                 resid -= wrknob;
1300                 if (wrknob < srcfrag->rf_nob) {
1301                         srcfrag->rf_addr += wrknob;
1302                         srcfrag->rf_nob -= wrknob;
1303                 } else {
1304                         srcfrag++;
1305                         srcidx++;
1306                 }
1307                 
1308                 if (wrknob < dstfrag->rf_nob) {
1309                         dstfrag->rf_addr += wrknob;
1310                         dstfrag->rf_nob -= wrknob;
1311                 } else {
1312                         dstfrag++;
1313                         dstidx++;
1314                 }
1315                 
1316                 tx->tx_nwrq++;
1317         }
1318
1319         if (rc < 0)                             /* no RDMA if completing with failure */
1320                 tx->tx_nwrq = 0;
1321 #endif
1322         
1323         ibmsg->ibm_u.completion.ibcm_status = rc;
1324         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1325         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1326
1327         return rc;
1328 }
1329
1330 void
1331 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1332 {
1333         spin_lock(&conn->ibc_lock);
1334         kibnal_queue_tx_locked (tx, conn);
1335         spin_unlock(&conn->ibc_lock);
1336         
1337         kibnal_check_sends(conn);
1338 }
1339
1340 void
1341 kibnal_schedule_active_connect_locked (kib_peer_t *peer, int proto_version)
1342 {
1343         /* Called holding kib_global_lock exclusive with IRQs disabled */
1344
1345         peer->ibp_version = proto_version;      /* proto version for new conn */
1346         peer->ibp_connecting++;                 /* I'm connecting */
1347         kibnal_peer_addref(peer);               /* extra ref for connd */
1348
1349         spin_lock(&kibnal_data.kib_connd_lock);
1350
1351         list_add_tail (&peer->ibp_connd_list, &kibnal_data.kib_connd_peers);
1352         wake_up (&kibnal_data.kib_connd_waitq);
1353
1354         spin_unlock(&kibnal_data.kib_connd_lock);
1355 }
1356
1357 void
1358 kibnal_schedule_active_connect (kib_peer_t *peer, int proto_version)
1359 {
1360         unsigned long flags;
1361
1362         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1363
1364         kibnal_schedule_active_connect_locked(peer, proto_version);
1365
1366         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1367 }
1368
1369 void
1370 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
1371 {
1372         kib_peer_t      *peer;
1373         kib_conn_t      *conn;
1374         unsigned long    flags;
1375         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1376         int              retry;
1377         int              rc;
1378
1379         /* If I get here, I've committed to send, so I complete the tx with
1380          * failure on any problems */
1381         
1382         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1383         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1384
1385         for (retry = 0; ; retry = 1) {
1386                 read_lock_irqsave(g_lock, flags);
1387         
1388                 peer = kibnal_find_peer_locked (nid);
1389                 if (peer != NULL) {
1390                         conn = kibnal_find_conn_locked (peer);
1391                         if (conn != NULL) {
1392                                 kibnal_conn_addref(conn); /* 1 ref for me... */
1393                                 read_unlock_irqrestore(g_lock, flags);
1394
1395                                 kibnal_queue_tx (tx, conn);
1396                                 kibnal_conn_decref(conn); /* ...to here */
1397                                 return;
1398                         }
1399                 }
1400                 
1401                 /* Making one or more connections; I'll need a write lock... */
1402                 read_unlock(g_lock);
1403                 write_lock(g_lock);
1404
1405                 peer = kibnal_find_peer_locked (nid);
1406                 if (peer != NULL)
1407                         break;
1408
1409                 write_unlock_irqrestore(g_lock, flags);
1410
1411                 if (retry) {
1412                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
1413
1414                         tx->tx_status = -EHOSTUNREACH;
1415                         tx->tx_waiting = 0;
1416                         kibnal_tx_done (tx);
1417                         return;
1418                 }
1419
1420                 rc = kibnal_add_persistent_peer(nid);
1421                 if (rc != 0) {
1422                         CERROR("Can't add peer %s: %d\n",
1423                                libcfs_nid2str(nid), rc);
1424                         
1425                         tx->tx_status = -EHOSTUNREACH;
1426                         tx->tx_waiting = 0;
1427                         kibnal_tx_done (tx);
1428                         return;
1429                 }
1430         }
1431
1432         conn = kibnal_find_conn_locked (peer);
1433         if (conn != NULL) {
1434                 /* Connection exists; queue message on it */
1435                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1436                 write_unlock_irqrestore(g_lock, flags);
1437                 
1438                 kibnal_queue_tx (tx, conn);
1439                 kibnal_conn_decref(conn);       /* ...until here */
1440                 return;
1441         }
1442
1443         if (!kibnal_peer_connecting(peer)) {
1444                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1445                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1446                         write_unlock_irqrestore(g_lock, flags);
1447                         tx->tx_status = -EHOSTUNREACH;
1448                         tx->tx_waiting = 0;
1449                         kibnal_tx_done (tx);
1450                         return;
1451                 }
1452
1453                 kibnal_schedule_active_connect_locked(peer, IBNAL_MSG_VERSION);
1454         }
1455         
1456         /* A connection is being established; queue the message... */
1457         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1458
1459         write_unlock_irqrestore(g_lock, flags);
1460 }
1461
1462 void
1463 kibnal_txlist_done (struct list_head *txlist, int status)
1464 {
1465         kib_tx_t *tx;
1466
1467         while (!list_empty (txlist)) {
1468                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1469
1470                 list_del (&tx->tx_list);
1471                 /* complete now */
1472                 tx->tx_waiting = 0;
1473                 tx->tx_status = status;
1474                 kibnal_tx_done (tx);
1475         }
1476 }
1477
1478 int
1479 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1480 {
1481         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1482         int               type = lntmsg->msg_type; 
1483         lnet_process_id_t target = lntmsg->msg_target;
1484         int               target_is_router = lntmsg->msg_target_is_router;
1485         int               routing = lntmsg->msg_routing;
1486         unsigned int      payload_niov = lntmsg->msg_niov; 
1487         struct iovec     *payload_iov = lntmsg->msg_iov; 
1488         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1489         unsigned int      payload_offset = lntmsg->msg_offset;
1490         unsigned int      payload_nob = lntmsg->msg_len;
1491         kib_msg_t        *ibmsg;
1492         kib_tx_t         *tx;
1493         int               nob;
1494         int               rc;
1495
1496         /* NB 'private' is different depending on what we're sending.... */
1497
1498         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1499                payload_nob, payload_niov, libcfs_id2str(target));
1500
1501         LASSERT (payload_nob == 0 || payload_niov > 0);
1502         LASSERT (payload_niov <= LNET_MAX_IOV);
1503
1504         /* Thread context */
1505         LASSERT (!in_interrupt());
1506         /* payload is either all vaddrs or all pages */
1507         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1508
1509         switch (type) {
1510         default:
1511                 LBUG();
1512                 return (-EIO);
1513                 
1514         case LNET_MSG_ACK:
1515                 LASSERT (payload_nob == 0);
1516                 break;
1517
1518         case LNET_MSG_GET:
1519                 if (routing || target_is_router)
1520                         break;                  /* send IMMEDIATE */
1521                 
1522                 /* is the REPLY message too small for RDMA? */
1523                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1524                 if (nob <= IBNAL_MSG_SIZE)
1525                         break;                  /* send IMMEDIATE */
1526
1527                 tx = kibnal_get_idle_tx();
1528                 if (tx == NULL) {
1529                         CERROR("Can allocate txd for GET to %s: \n",
1530                                libcfs_nid2str(target.nid));
1531                         return -ENOMEM;
1532                 }
1533                 
1534                 ibmsg = tx->tx_msg;
1535                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1536                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1537
1538                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1539                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1540                                                  0,
1541                                                  lntmsg->msg_md->md_niov,
1542                                                  lntmsg->msg_md->md_iov.iov,
1543                                                  0, lntmsg->msg_md->md_length);
1544                 else
1545                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1546                                                   0,
1547                                                   lntmsg->msg_md->md_niov,
1548                                                   lntmsg->msg_md->md_iov.kiov,
1549                                                   0, lntmsg->msg_md->md_length);
1550                 if (rc != 0) {
1551                         CERROR("Can't setup GET sink for %s: %d\n",
1552                                libcfs_nid2str(target.nid), rc);
1553                         kibnal_tx_done(tx);
1554                         return -EIO;
1555                 }
1556
1557 #if IBNAL_USE_FMR
1558                 nob = sizeof(kib_get_msg_t);
1559 #else
1560                 {
1561                         int n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1562                         
1563                         nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1564                 }
1565 #endif
1566                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1567
1568                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni,
1569                                                          lntmsg);
1570                 if (tx->tx_lntmsg[1] == NULL) {
1571                         CERROR("Can't create reply for GET -> %s\n",
1572                                libcfs_nid2str(target.nid));
1573                         kibnal_tx_done(tx);
1574                         return -EIO;
1575                 }
1576
1577                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1578                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1579                 kibnal_launch_tx(tx, target.nid);
1580                 return 0;
1581
1582         case LNET_MSG_REPLY: 
1583         case LNET_MSG_PUT:
1584                 /* Is the payload small enough not to need RDMA? */
1585                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1586                 if (nob <= IBNAL_MSG_SIZE)
1587                         break;                  /* send IMMEDIATE */
1588
1589                 tx = kibnal_get_idle_tx();
1590                 if (tx == NULL) {
1591                         CERROR("Can't allocate %s txd for %s\n",
1592                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1593                                libcfs_nid2str(target.nid));
1594                         return -ENOMEM;
1595                 }
1596
1597                 if (payload_kiov == NULL)
1598                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1,
1599                                                  payload_niov, payload_iov,
1600                                                  payload_offset, payload_nob);
1601                 else
1602                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1,
1603                                                   payload_niov, payload_kiov,
1604                                                   payload_offset, payload_nob);
1605                 if (rc != 0) {
1606                         CERROR("Can't setup PUT src for %s: %d\n",
1607                                libcfs_nid2str(target.nid), rc);
1608                         kibnal_tx_done(tx);
1609                         return -EIO;
1610                 }
1611
1612                 ibmsg = tx->tx_msg;
1613                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1614                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1615                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1616
1617                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1618                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1619                 kibnal_launch_tx(tx, target.nid);
1620                 return 0;
1621         }
1622
1623         /* send IMMEDIATE */
1624
1625         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1626                  <= IBNAL_MSG_SIZE);
1627
1628         tx = kibnal_get_idle_tx();
1629         if (tx == NULL) {
1630                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1631                         type, libcfs_nid2str(target.nid));
1632                 return -ENOMEM;
1633         }
1634
1635         ibmsg = tx->tx_msg;
1636         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1637
1638         if (payload_kiov != NULL)
1639                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1640                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1641                                     payload_niov, payload_kiov,
1642                                     payload_offset, payload_nob);
1643         else
1644                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1645                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1646                                    payload_niov, payload_iov,
1647                                    payload_offset, payload_nob);
1648
1649         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1650         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1651
1652         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1653         kibnal_launch_tx(tx, target.nid);
1654         return 0;
1655 }
1656
1657 void
1658 kibnal_reply(lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1659 {
1660         lnet_process_id_t target = lntmsg->msg_target;
1661         unsigned int      niov = lntmsg->msg_niov; 
1662         struct iovec     *iov = lntmsg->msg_iov; 
1663         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1664         unsigned int      offset = lntmsg->msg_offset;
1665         unsigned int      nob = lntmsg->msg_len;
1666         kib_tx_t         *tx;
1667         int               rc;
1668         
1669         tx = kibnal_get_idle_tx();
1670         if (tx == NULL) {
1671                 CERROR("Can't get tx for REPLY to %s\n",
1672                        libcfs_nid2str(target.nid));
1673                 goto failed_0;
1674         }
1675
1676         if (nob == 0)
1677                 rc = 0;
1678         else if (kiov == NULL)
1679                 rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1, 
1680                                          niov, iov, offset, nob);
1681         else
1682                 rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1, 
1683                                           niov, kiov, offset, nob);
1684
1685         if (rc != 0) {
1686                 CERROR("Can't setup GET src for %s: %d\n",
1687                        libcfs_nid2str(target.nid), rc);
1688                 goto failed_1;
1689         }
1690         
1691         rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
1692                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1693                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1694         if (rc < 0) {
1695                 CERROR("Can't setup rdma for GET from %s: %d\n", 
1696                        libcfs_nid2str(target.nid), rc);
1697                 goto failed_1;
1698         }
1699         
1700         if (rc == 0) {
1701                 /* No RDMA: local completion may happen now! */
1702                 lnet_finalize(ni, lntmsg, 0);
1703         } else {
1704                 /* RDMA: lnet_finalize(lntmsg) when it
1705                  * completes */
1706                 tx->tx_lntmsg[0] = lntmsg;
1707         }
1708         
1709         kibnal_queue_tx(tx, rx->rx_conn);
1710         return;
1711         
1712  failed_1:
1713         kibnal_tx_done(tx);
1714  failed_0:
1715         lnet_finalize(ni, lntmsg, -EIO);
1716 }
1717
1718 int
1719 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1720                    void **new_private)
1721 {
1722         kib_rx_t    *rx = private;
1723         kib_conn_t  *conn = rx->rx_conn;
1724
1725         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1726                 /* Can't block if RDMA completions need normal credits */
1727                 LCONSOLE_ERROR_MSG(0x12d,  "Dropping message from %s: no "
1728                                    "buffers free. %s is running an old version"
1729                                    " of LNET that may deadlock if messages "
1730                                    "wait for buffers)\n",
1731                                    libcfs_nid2str(conn->ibc_peer->ibp_nid),
1732                                    libcfs_nid2str(conn->ibc_peer->ibp_nid));
1733                 return -EDEADLK;
1734         }
1735         
1736         *new_private = private;
1737         return 0;
1738 }
1739
1740 int
1741 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1742              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1743              unsigned int offset, unsigned int mlen, unsigned int rlen)
1744 {
1745         kib_rx_t    *rx = private;
1746         kib_msg_t   *rxmsg = rx->rx_msg;
1747         kib_conn_t  *conn = rx->rx_conn;
1748         kib_tx_t    *tx;
1749         kib_msg_t   *txmsg;
1750         int          nob;
1751         int          post_cred = 1;
1752         int          rc = 0;
1753         
1754         LASSERT (mlen <= rlen);
1755         LASSERT (!in_interrupt());
1756         /* Either all pages or all vaddrs */
1757         LASSERT (!(kiov != NULL && iov != NULL));
1758
1759         switch (rxmsg->ibm_type) {
1760         default:
1761                 LBUG();
1762                 
1763         case IBNAL_MSG_IMMEDIATE:
1764                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1765                 if (nob > rx->rx_nob) {
1766                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1767                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1768                                 nob, rx->rx_nob);
1769                         rc = -EPROTO;
1770                         break;
1771                 }
1772
1773                 if (kiov != NULL)
1774                         lnet_copy_flat2kiov(niov, kiov, offset,
1775                                             IBNAL_MSG_SIZE, rxmsg,
1776                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1777                                             mlen);
1778                 else
1779                         lnet_copy_flat2iov(niov, iov, offset,
1780                                            IBNAL_MSG_SIZE, rxmsg,
1781                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1782                                            mlen);
1783                 lnet_finalize (ni, lntmsg, 0);
1784                 break;
1785
1786         case IBNAL_MSG_PUT_REQ:
1787                 if (mlen == 0) {
1788                         lnet_finalize(ni, lntmsg, 0);
1789                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
1790                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1791                         break;
1792                 }
1793                 
1794                 tx = kibnal_get_idle_tx();
1795                 if (tx == NULL) {
1796                         CERROR("Can't allocate tx for %s\n",
1797                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1798                         /* Not replying will break the connection */
1799                         rc = -ENOMEM;
1800                         break;
1801                 }
1802
1803                 txmsg = tx->tx_msg;
1804                 if (kiov == NULL)
1805                         rc = kibnal_setup_rd_iov(tx, 
1806                                                  &txmsg->ibm_u.putack.ibpam_rd,
1807                                                  0,
1808                                                  niov, iov, offset, mlen);
1809                 else
1810                         rc = kibnal_setup_rd_kiov(tx,
1811                                                   &txmsg->ibm_u.putack.ibpam_rd,
1812                                                   0,
1813                                                   niov, kiov, offset, mlen);
1814                 if (rc != 0) {
1815                         CERROR("Can't setup PUT sink for %s: %d\n",
1816                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1817                         kibnal_tx_done(tx);
1818                         /* tell peer it's over */
1819                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, rc,
1820                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1821                         break;
1822                 }
1823
1824                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1825                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1826 #if IBNAL_USE_FMR
1827                 nob = sizeof(kib_putack_msg_t);
1828 #else
1829                 {
1830                         int n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1831
1832                         nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1833                 }
1834 #endif
1835                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1836
1837                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1838                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1839                 kibnal_queue_tx(tx, conn);
1840
1841                 if (conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
1842                         post_cred = 0; /* peer still owns 'rx' for sending PUT_DONE */
1843                 break;
1844
1845         case IBNAL_MSG_GET_REQ:
1846                 if (lntmsg != NULL) {
1847                         /* Optimized GET; RDMA lntmsg's payload */
1848                         kibnal_reply(ni, rx, lntmsg);
1849                 } else {
1850                         /* GET didn't match anything */
1851                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, 
1852                                                -ENODATA,
1853                                                rxmsg->ibm_u.get.ibgm_cookie);
1854                 }
1855                 break;
1856         }
1857
1858         kibnal_post_rx(rx, post_cred, 0);
1859         return rc;
1860 }
1861
1862 int
1863 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1864 {
1865         long    pid = kernel_thread (fn, arg, 0);
1866
1867         if (pid < 0)
1868                 return ((int)pid);
1869
1870         atomic_inc (&kibnal_data.kib_nthreads);
1871         return (0);
1872 }
1873
1874 void
1875 kibnal_thread_fini (void)
1876 {
1877         atomic_dec (&kibnal_data.kib_nthreads);
1878 }
1879
1880 void
1881 kibnal_peer_alive (kib_peer_t *peer)
1882 {
1883         /* This is racy, but everyone's only writing cfs_time_current() */
1884         peer->ibp_last_alive = cfs_time_current();
1885         mb();
1886 }
1887
1888 void
1889 kibnal_peer_notify (kib_peer_t *peer)
1890 {
1891         time_t        last_alive = 0;
1892         int           error = 0;
1893         unsigned long flags;
1894         
1895         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1896
1897         if (list_empty(&peer->ibp_conns) &&
1898             peer->ibp_accepting == 0 &&
1899             peer->ibp_connecting == 0 &&
1900             peer->ibp_error != 0) {
1901                 error = peer->ibp_error;
1902                 peer->ibp_error = 0;
1903                 last_alive = cfs_time_current_sec() -
1904                              cfs_duration_sec(cfs_time_current() -
1905                                               peer->ibp_last_alive);
1906         }
1907         
1908         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1909         
1910         if (error != 0)
1911                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1912 }
1913
1914 void
1915 kibnal_schedule_conn (kib_conn_t *conn)
1916 {
1917         unsigned long flags;
1918
1919         kibnal_conn_addref(conn);               /* ++ref for connd */
1920         
1921         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1922
1923         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1924         wake_up (&kibnal_data.kib_connd_waitq);
1925                 
1926         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1927 }
1928
1929 void
1930 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1931 {
1932         /* This just does the immediate housekeeping to start shutdown of an
1933          * established connection.  'error' is zero for a normal shutdown.
1934          * Caller holds kib_global_lock exclusively in irq context */
1935         kib_peer_t       *peer = conn->ibc_peer;
1936         
1937         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1938
1939         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1940                 return; /* already being handled  */
1941         
1942         /* NB Can't take ibc_lock here (could be in IRQ context), without
1943          * risking deadlock, so access to ibc_{tx_queue,active_txs} is racey */
1944
1945         if (error == 0 &&
1946             list_empty(&conn->ibc_tx_queue) &&
1947             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1948             list_empty(&conn->ibc_tx_queue_nocred) &&
1949             list_empty(&conn->ibc_active_txs)) {
1950                 CDEBUG(D_NET, "closing conn to %s"
1951                        " rx# "LPD64" tx# "LPD64"\n", 
1952                        libcfs_nid2str(peer->ibp_nid),
1953                        conn->ibc_txseq, conn->ibc_rxseq);
1954         } else {
1955                 CDEBUG(D_NETERROR, "Closing conn to %s: error %d%s%s%s%s"
1956                        " rx# "LPD64" tx# "LPD64"\n",
1957                        libcfs_nid2str(peer->ibp_nid), error,
1958                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1959                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1960                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1961                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
1962                        conn->ibc_txseq, conn->ibc_rxseq);
1963 #if 0
1964                 /* can't skip down the queue without holding ibc_lock (see above) */
1965                 list_for_each(tmp, &conn->ibc_tx_queue) {
1966                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1967                         
1968                         CERROR("   queued tx type %x cookie "LPX64
1969                                " sending %d waiting %d ticks %ld/%d\n", 
1970                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1971                                tx->tx_sending, tx->tx_waiting,
1972                                (long)(tx->tx_deadline - jiffies), HZ);
1973                 }
1974
1975                 list_for_each(tmp, &conn->ibc_active_txs) {
1976                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1977                         
1978                         CERROR("   active tx type %x cookie "LPX64
1979                                " sending %d waiting %d ticks %ld/%d\n", 
1980                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1981                                tx->tx_sending, tx->tx_waiting,
1982                                (long)(tx->tx_deadline - jiffies), HZ);
1983                 }
1984 #endif
1985         }
1986
1987         list_del (&conn->ibc_list);
1988
1989         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1990                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1991                     kibnal_peer_active(peer))     /* still in peer table */
1992                         kibnal_unlink_peer_locked (peer);
1993
1994                 peer->ibp_error = error; /* set/clear error on last conn */
1995         }
1996
1997         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTING);
1998
1999         kibnal_schedule_conn(conn);
2000         kibnal_conn_decref(conn);               /* lose ibc_list's ref */
2001 }
2002
2003 void
2004 kibnal_close_conn (kib_conn_t *conn, int error)
2005 {
2006         unsigned long flags;
2007         
2008         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2009
2010         kibnal_close_conn_locked (conn, error);
2011         
2012         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2013 }
2014
2015 void
2016 kibnal_handle_early_rxs(kib_conn_t *conn)
2017 {
2018         unsigned long    flags;
2019         kib_rx_t        *rx;
2020
2021         LASSERT (!in_interrupt());
2022         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
2023         
2024         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2025         while (!list_empty(&conn->ibc_early_rxs)) {
2026                 rx = list_entry(conn->ibc_early_rxs.next,
2027                                 kib_rx_t, rx_list);
2028                 list_del(&rx->rx_list);
2029                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2030                 
2031                 kibnal_handle_rx(rx);
2032                 
2033                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2034         }
2035         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2036 }
2037
2038 void
2039 kibnal_abort_txs(kib_conn_t *conn, struct list_head *txs)
2040 {
2041         LIST_HEAD           (zombies); 
2042         struct list_head    *tmp;
2043         struct list_head    *nxt;
2044         kib_tx_t            *tx;
2045
2046         spin_lock(&conn->ibc_lock);
2047
2048         list_for_each_safe (tmp, nxt, txs) {
2049                 tx = list_entry (tmp, kib_tx_t, tx_list);
2050
2051                 if (txs == &conn->ibc_active_txs) {
2052                         LASSERT (!tx->tx_queued);
2053                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2054                 } else {
2055                         LASSERT (tx->tx_queued);
2056                 }
2057                 
2058                 tx->tx_status = -ECONNABORTED;
2059                 tx->tx_queued = 0;
2060                 tx->tx_waiting = 0;
2061                 
2062                 if (tx->tx_sending == 0) {
2063                         list_del (&tx->tx_list);
2064                         list_add (&tx->tx_list, &zombies);
2065                 }
2066         }
2067
2068         spin_unlock(&conn->ibc_lock);
2069
2070         kibnal_txlist_done(&zombies, -ECONNABORTED);
2071 }
2072
2073 void
2074 kibnal_conn_disconnected(kib_conn_t *conn)
2075 {
2076         static IB_QP_ATTRIBUTES_MODIFY qpam = {.RequestState = QPStateError};
2077
2078         FSTATUS           frc;
2079
2080         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT_QP);
2081
2082         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
2083
2084         /* move QP to error state to make posted work items complete */
2085         frc = iba_modify_qp(conn->ibc_qp, &qpam, NULL);
2086         if (frc != FSUCCESS)
2087                 CERROR("can't move qp state to error: %d\n", frc);
2088
2089         /* Complete all tx descs not waiting for sends to complete.
2090          * NB we should be safe from RDMA now that the QP has changed state */
2091
2092         kibnal_abort_txs(conn, &conn->ibc_tx_queue);
2093         kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2094         kibnal_abort_txs(conn, &conn->ibc_tx_queue);
2095         kibnal_abort_txs(conn, &conn->ibc_active_txs);
2096
2097         kibnal_handle_early_rxs(conn);
2098 }
2099
2100 void
2101 kibnal_peer_connect_failed (kib_peer_t *peer, int type, int error)
2102 {
2103         LIST_HEAD        (zombies);
2104         unsigned long     flags;
2105
2106         LASSERT (error != 0);
2107         LASSERT (!in_interrupt());
2108
2109         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2110
2111         LASSERT (kibnal_peer_connecting(peer));
2112
2113         switch (type) {
2114         case IBNAL_CONN_ACTIVE:
2115                 LASSERT (peer->ibp_connecting > 0);
2116                 peer->ibp_connecting--;
2117                 break;
2118                 
2119         case IBNAL_CONN_PASSIVE:
2120                 LASSERT (peer->ibp_accepting > 0);
2121                 peer->ibp_accepting--;
2122                 break;
2123                 
2124         case IBNAL_CONN_WAITING:
2125                 /* Can't assert; I might be racing with a successful connection
2126                  * which clears passivewait */
2127                 peer->ibp_passivewait = 0;
2128                 break;
2129         default:
2130                 LBUG();
2131         }
2132
2133         if (kibnal_peer_connecting(peer) ||     /* another attempt underway */
2134             !list_empty(&peer->ibp_conns)) {    /* got connected */
2135                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2136                 return;
2137         }
2138
2139         /* Say when active connection can be re-attempted */
2140         peer->ibp_reconnect_interval *= 2;
2141         peer->ibp_reconnect_interval =
2142                 MAX(peer->ibp_reconnect_interval,
2143                     *kibnal_tunables.kib_min_reconnect_interval);
2144         peer->ibp_reconnect_interval =
2145                 MIN(peer->ibp_reconnect_interval,
2146                     *kibnal_tunables.kib_max_reconnect_interval);
2147         
2148         peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval * HZ;
2149
2150         /* Take peer's blocked transmits to complete with error */
2151         list_add(&zombies, &peer->ibp_tx_queue);
2152         list_del_init(&peer->ibp_tx_queue);
2153                 
2154         if (kibnal_peer_active(peer) &&
2155             peer->ibp_persistence == 0) {
2156                 /* failed connection attempt on non-persistent peer */
2157                 kibnal_unlink_peer_locked (peer);
2158         }
2159
2160         peer->ibp_error = error;
2161         
2162         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2163
2164         kibnal_peer_notify(peer);
2165
2166         if (list_empty (&zombies))
2167                 return;
2168         
2169         CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
2170                 libcfs_nid2str(peer->ibp_nid));
2171
2172         kibnal_txlist_done (&zombies, -EHOSTUNREACH);
2173 }
2174
2175 void
2176 kibnal_connreq_done (kib_conn_t *conn, int type, int status)
2177 {
2178         kib_peer_t       *peer = conn->ibc_peer;
2179         struct list_head  txs;
2180         kib_tx_t         *tx;
2181         unsigned long     flags;
2182
2183         LASSERT (!in_interrupt());
2184         LASSERT (type == IBNAL_CONN_ACTIVE || type == IBNAL_CONN_PASSIVE);
2185         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT_QP);
2186         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
2187         LASSERT (kibnal_peer_connecting(peer));
2188
2189         LIBCFS_FREE(conn->ibc_cvars, sizeof(*conn->ibc_cvars));
2190         conn->ibc_cvars = NULL;
2191
2192         if (status != 0) {
2193                 /* failed to establish connection */
2194                 kibnal_peer_connect_failed(conn->ibc_peer, type, status);
2195                 kibnal_conn_disconnected(conn);
2196                 kibnal_conn_decref(conn);       /* Lose CM's ref */
2197                 return;
2198         }
2199
2200         /* connection established */
2201         LASSERT(conn->ibc_state == IBNAL_CONN_CONNECTING);
2202
2203         conn->ibc_last_send = jiffies;
2204         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2205         kibnal_peer_alive(peer);
2206
2207         CDEBUG(D_NET, "Connection %s ESTABLISHED\n",
2208                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2209
2210         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2211
2212         peer->ibp_passivewait = 0;              /* not waiting (got conn now) */
2213         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2214         list_add_tail(&conn->ibc_list, &peer->ibp_conns);
2215         
2216         if (!kibnal_peer_active(peer)) {
2217                 /* peer has been deleted */
2218                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2219                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2220
2221                 kibnal_peer_connect_failed(conn->ibc_peer, type, -ECONNABORTED);
2222                 kibnal_conn_decref(conn);       /* lose CM's ref */
2223                 return;
2224         }
2225         
2226         switch (type) {
2227         case IBNAL_CONN_ACTIVE:
2228                 LASSERT (peer->ibp_connecting > 0);
2229                 peer->ibp_connecting--;
2230                 break;
2231
2232         case IBNAL_CONN_PASSIVE:
2233                 LASSERT (peer->ibp_accepting > 0);
2234                 peer->ibp_accepting--;
2235                 break;
2236         default:
2237                 LBUG();
2238         }
2239         
2240         peer->ibp_reconnect_interval = 0;       /* OK to reconnect at any time */
2241
2242         /* Nuke any dangling conns from a different peer instance... */
2243         kibnal_close_stale_conns_locked(peer, conn->ibc_incarnation);
2244
2245         /* grab txs blocking for a conn */
2246         list_add(&txs, &peer->ibp_tx_queue);
2247         list_del_init(&peer->ibp_tx_queue);
2248
2249         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2250         
2251         /* Schedule blocked txs */
2252         spin_lock (&conn->ibc_lock);
2253         while (!list_empty (&txs)) {
2254                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2255                 list_del (&tx->tx_list);
2256
2257                 kibnal_queue_tx_locked (tx, conn);
2258         }
2259         spin_unlock (&conn->ibc_lock);
2260         kibnal_check_sends (conn);
2261 }
2262
2263 void
2264 kibnal_reject (lnet_nid_t nid, IB_HANDLE cep, int why)
2265 {
2266         static CM_REJECT_INFO  msgs[3];
2267         CM_REJECT_INFO        *msg = &msgs[why];
2268         FSTATUS                frc;
2269
2270         LASSERT (why >= 0 && why < sizeof(msgs)/sizeof(msgs[0]));
2271
2272         /* If I wasn't so lazy, I'd initialise this only once; it's effectively
2273          * read-only... */
2274         msg->Reason         = RC_USER_REJ;
2275         msg->PrivateData[0] = (IBNAL_MSG_MAGIC) & 0xff;
2276         msg->PrivateData[1] = (IBNAL_MSG_MAGIC >> 8) & 0xff;
2277         msg->PrivateData[2] = (IBNAL_MSG_MAGIC >> 16) & 0xff;
2278         msg->PrivateData[3] = (IBNAL_MSG_MAGIC >> 24) & 0xff;
2279         msg->PrivateData[4] = (IBNAL_MSG_VERSION) & 0xff;
2280         msg->PrivateData[5] = (IBNAL_MSG_VERSION >> 8) & 0xff;
2281         msg->PrivateData[6] = why;
2282
2283         frc = iba_cm_reject(cep, msg);
2284         if (frc != FSUCCESS)
2285                 CERROR("Error %d rejecting %s\n", frc, libcfs_nid2str(nid));
2286 }
2287
2288 void
2289 kibnal_check_connreject(kib_conn_t *conn, int type, CM_REJECT_INFO *rej)
2290 {
2291         kib_peer_t    *peer = conn->ibc_peer;
2292         unsigned long  flags;
2293         int            magic;
2294         int            version;
2295         int            why;
2296
2297         LASSERT (type == IBNAL_CONN_ACTIVE ||
2298                  type == IBNAL_CONN_PASSIVE);
2299
2300         CDEBUG(D_NET, "%s connection with %s rejected: %d\n",
2301                (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2302                libcfs_nid2str(peer->ibp_nid), rej->Reason);
2303
2304         switch (rej->Reason) {
2305         case RC_STALE_CONN:
2306                 if (type == IBNAL_CONN_PASSIVE) {
2307                         CERROR("Connection to %s rejected (stale QP)\n",
2308                                libcfs_nid2str(peer->ibp_nid));
2309                 } else {
2310                         CWARN("Connection from %s rejected (stale QP): "
2311                               "retrying...\n", libcfs_nid2str(peer->ibp_nid));
2312
2313                         /* retry from scratch to allocate a new conn 
2314                          * which will use a different QP */
2315                         kibnal_schedule_active_connect(peer, peer->ibp_version);
2316                 }
2317
2318                 /* An FCM_DISCONNECTED callback is still outstanding: give it a
2319                  * ref since kibnal_connreq_done() drops the CM's ref on conn
2320                  * on failure */
2321                 kibnal_conn_addref(conn);
2322                 break;
2323
2324         case RC_USER_REJ:
2325                 magic   = (rej->PrivateData[0]) |
2326                           (rej->PrivateData[1] << 8) |
2327                           (rej->PrivateData[2] << 16) |
2328                           (rej->PrivateData[3] << 24);
2329                 version = (rej->PrivateData[4]) |
2330                           (rej->PrivateData[5] << 8);
2331                 why     = (rej->PrivateData[6]);
2332
2333                 /* retry with old proto version */
2334                 if (magic == IBNAL_MSG_MAGIC &&
2335                     version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
2336                     conn->ibc_version == IBNAL_MSG_VERSION &&
2337                     type != IBNAL_CONN_PASSIVE) {
2338                         /* retry with a new conn */
2339                         CWARN ("Connection to %s refused: "
2340                                "retrying with old protocol version 0x%x\n", 
2341                                libcfs_nid2str(peer->ibp_nid), version);
2342                         kibnal_schedule_active_connect(peer, version);
2343                         break;
2344                 }
2345
2346                 if (magic != IBNAL_MSG_MAGIC ||
2347                     version != IBNAL_MSG_VERSION) {
2348                         CERROR("%s connection with %s rejected "
2349                                "(magic/ver %08x/%d why %d): "
2350                                "incompatible protocol\n",
2351                                (type == IBNAL_CONN_ACTIVE) ?
2352                                "Active" : "Passive",
2353                                libcfs_nid2str(peer->ibp_nid),
2354                                magic, version, why);
2355                         break;
2356                 }
2357
2358                 if (type == IBNAL_CONN_ACTIVE && 
2359                     why == IBNAL_REJECT_CONN_RACE) {
2360                         /* lost connection race */
2361                         CWARN("Connection to %s rejected: "
2362                               "lost connection race\n",
2363                               libcfs_nid2str(peer->ibp_nid));
2364
2365                         write_lock_irqsave(&kibnal_data.kib_global_lock, 
2366                                            flags);
2367
2368                         if (list_empty(&peer->ibp_conns)) {
2369                                 peer->ibp_passivewait = 1;
2370                                 peer->ibp_passivewait_deadline =
2371                                         jiffies + 
2372                                         (*kibnal_tunables.kib_timeout * HZ);
2373                         }
2374                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2375                                                 flags);
2376                         break;
2377                 }
2378
2379                 CERROR("%s connection with %s rejected: %d\n",
2380                        (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2381                        libcfs_nid2str(peer->ibp_nid), why);
2382                 break;
2383
2384         default:
2385                 CERROR("%s connection with %s rejected: %d\n",
2386                        (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2387                        libcfs_nid2str(peer->ibp_nid), rej->Reason);
2388         }
2389         
2390         kibnal_connreq_done(conn, type, -ECONNREFUSED);
2391 }
2392
2393 void
2394 kibnal_cm_disconnect_callback(kib_conn_t *conn, CM_CONN_INFO *info)
2395 {
2396         CDEBUG(D_NET, "%s: state %d, status 0x%x\n", 
2397                libcfs_nid2str(conn->ibc_peer->ibp_nid),
2398                conn->ibc_state, info->Status);
2399         
2400         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
2401
2402         switch (info->Status) {
2403         default:
2404                 LBUG();
2405                 break;
2406
2407         case FCM_DISCONNECT_REQUEST:
2408                 /* Schedule conn to iba_cm_disconnect() if it wasn't already */
2409                 kibnal_close_conn (conn, 0);
2410                 break;
2411
2412         case FCM_DISCONNECT_REPLY:              /* peer acks my disconnect req */
2413         case FCM_DISCONNECTED:                  /* end of TIME_WAIT */
2414                 CDEBUG(D_NET, "Connection %s disconnected.\n",
2415                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
2416                 kibnal_conn_decref(conn);       /* Lose CM's ref */
2417                 break;
2418         }
2419 }
2420
2421 void
2422 kibnal_cm_passive_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2423 {
2424         kib_conn_t       *conn = arg;
2425
2426         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2427
2428         /* Established Connection Notifier */
2429         switch (info->Status) {
2430         default:
2431                 CERROR("Unexpected status %d on Connection %s\n",
2432                        info->Status, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2433                 LBUG();
2434                 break;
2435
2436         case FCM_CONNECT_TIMEOUT:
2437                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ETIMEDOUT);
2438                 break;
2439                 
2440         case FCM_CONNECT_REJECT:
2441                 kibnal_check_connreject(conn, IBNAL_CONN_PASSIVE, 
2442                                         &info->Info.Reject);
2443                 break;
2444
2445         case FCM_CONNECT_ESTABLISHED:
2446                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, 0);
2447                 break;
2448
2449         case FCM_DISCONNECT_REQUEST:
2450         case FCM_DISCONNECT_REPLY:
2451         case FCM_DISCONNECTED:
2452                 kibnal_cm_disconnect_callback(conn, info);
2453                 break;
2454         }
2455 }
2456
2457 int
2458 kibnal_accept (kib_conn_t **connp, IB_HANDLE cep, kib_msg_t *msg, int nob)
2459 {
2460         lnet_nid_t     nid;
2461         kib_conn_t    *conn;
2462         kib_peer_t    *peer;
2463         kib_peer_t    *peer2;
2464         unsigned long  flags;
2465         int            rc;
2466
2467         rc = kibnal_unpack_msg(msg, 0, nob);
2468         if (rc != 0) {
2469                 /* SILENT! kibnal_unpack_msg() complains if required */
2470                 kibnal_reject(LNET_NID_ANY, cep, IBNAL_REJECT_FATAL);
2471                 return -EPROTO;
2472         }
2473
2474         nid = msg->ibm_srcnid;
2475
2476         if (msg->ibm_version != IBNAL_MSG_VERSION)
2477                 CWARN("Connection from %s: old protocol version 0x%x\n",
2478                       libcfs_nid2str(nid), msg->ibm_version);
2479
2480         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
2481                 CERROR("Can't accept %s: bad request type %d (%d expected)\n",
2482                        libcfs_nid2str(nid), msg->ibm_type, IBNAL_MSG_CONNREQ);
2483                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2484                 return -EPROTO;
2485         }
2486         
2487         if (msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid) {
2488                 CERROR("Can't accept %s: bad dst NID %s (%s expected)\n",
2489                        libcfs_nid2str(nid), 
2490                        libcfs_nid2str(msg->ibm_dstnid), 
2491                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
2492                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2493                 return -EPROTO;
2494         }
2495         
2496         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE ||
2497             msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE ||
2498             msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2499                 CERROR("Reject %s: q %d sz %d frag %d, (%d %d %d expected)\n",
2500                        libcfs_nid2str(nid), 
2501                        msg->ibm_u.connparams.ibcp_queue_depth,
2502                        msg->ibm_u.connparams.ibcp_max_msg_size,
2503                        msg->ibm_u.connparams.ibcp_max_frags,
2504                        IBNAL_MSG_QUEUE_SIZE,
2505                        IBNAL_MSG_SIZE,
2506                        IBNAL_MAX_RDMA_FRAGS);
2507                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2508                 return -EPROTO;
2509         }
2510
2511         conn = kibnal_create_conn(nid, msg->ibm_version);
2512         if (conn == NULL) {
2513                 kibnal_reject(nid, cep, IBNAL_REJECT_NO_RESOURCES);
2514                 return -ENOMEM;
2515         }
2516         
2517         /* assume 'nid' is a new peer */
2518         rc = kibnal_create_peer(&peer, nid);
2519         if (rc != 0) {
2520                 kibnal_conn_decref(conn);
2521                 kibnal_reject(nid, cep, IBNAL_REJECT_NO_RESOURCES);
2522                 return -ENOMEM;
2523         }
2524         
2525         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
2526
2527         if (kibnal_data.kib_listener_cep == NULL) { /* shutdown started */
2528                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2529
2530                 kibnal_peer_decref(peer);
2531                 kibnal_conn_decref(conn);
2532                 kibnal_reject(nid, cep, IBNAL_REJECT_NO_RESOURCES);
2533                 return -ESHUTDOWN;
2534         }
2535
2536         peer2 = kibnal_find_peer_locked(nid);
2537         if (peer2 == NULL) {
2538                 /* peer table takes my ref on peer */
2539                 list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(nid));
2540                 LASSERT (peer->ibp_connecting == 0);
2541         } else {
2542                 kibnal_peer_decref(peer);
2543                 peer = peer2;
2544
2545                 if (peer->ibp_connecting != 0 &&
2546                     peer->ibp_nid < kibnal_data.kib_ni->ni_nid) {
2547                         /* Resolve concurrent connection attempts in favour of
2548                          * the higher NID */
2549                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2550                                                 flags);
2551                         kibnal_conn_decref(conn);
2552                         kibnal_reject(nid, cep, IBNAL_REJECT_CONN_RACE);
2553                         return -EALREADY;
2554                 }
2555         }
2556
2557         kibnal_peer_addref(peer); /* +1 ref for conn */
2558         peer->ibp_accepting++;
2559
2560         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2561         conn->ibc_peer = peer;
2562         conn->ibc_incarnation = msg->ibm_srcstamp;
2563         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2564         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2565         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2566                  <= IBNAL_RX_MSGS);
2567
2568         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2569
2570         *connp = conn;
2571         return 0;
2572 }
2573
2574 void
2575 kibnal_listen_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2576 {
2577
2578         CM_REQUEST_INFO  *req = &info->Info.Request;
2579         CM_REPLY_INFO    *rep;
2580         kib_conn_t       *conn;
2581         FSTATUS           frc;
2582         int               rc;
2583         
2584         LASSERT(arg == NULL); /* no conn yet for passive */
2585
2586         CDEBUG(D_NET, "%x\n", info->Status);
2587         
2588         if (info->Status == FCM_CONNECT_CANCEL) {
2589                 up(&kibnal_data.kib_listener_signal);
2590                 return;
2591         }
2592         
2593         LASSERT (info->Status == FCM_CONNECT_REQUEST);
2594
2595         rc = kibnal_accept(&conn, cep, (kib_msg_t *)req->PrivateData, 
2596                            CM_REQUEST_INFO_USER_LEN);
2597         if (rc != 0)                   /* kibnal_accept has rejected */
2598                 return;
2599
2600         conn->ibc_cvars->cv_path = req->PathInfo.Path;
2601         
2602         rc = kibnal_conn_rts(conn, 
2603                              req->CEPInfo.QPN, 
2604                              req->CEPInfo.OfferedInitiatorDepth,
2605                              req->CEPInfo.OfferedResponderResources,
2606                              req->CEPInfo.StartingPSN);
2607         if (rc != 0) {
2608                 kibnal_reject(conn->ibc_peer->ibp_nid, cep, 
2609                               IBNAL_REJECT_NO_RESOURCES);
2610                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ECONNABORTED);
2611                 return;
2612         }
2613
2614         memset(&conn->ibc_cvars->cv_cmci, 0, sizeof(conn->ibc_cvars->cv_cmci));
2615         rep = &conn->ibc_cvars->cv_cmci.Info.Reply;
2616
2617         rep->QPN                   = conn->ibc_cvars->cv_qpattrs.QPNumber;
2618         rep->QKey                  = conn->ibc_cvars->cv_qpattrs.Qkey;
2619         rep->StartingPSN           = conn->ibc_cvars->cv_qpattrs.RecvPSN;
2620         rep->EndToEndFlowControl   = conn->ibc_cvars->cv_qpattrs.FlowControl;
2621         rep->ArbInitiatorDepth     = conn->ibc_cvars->cv_qpattrs.InitiatorDepth;
2622         rep->ArbResponderResources = conn->ibc_cvars->cv_qpattrs.ResponderResources;
2623         rep->TargetAckDelay        = kibnal_data.kib_hca_attrs.LocalCaAckDelay;
2624         rep->FailoverAccepted      = IBNAL_FAILOVER_ACCEPTED;
2625         rep->RnRRetryCount         = req->CEPInfo.RnrRetryCount;
2626         
2627         CLASSERT (CM_REPLY_INFO_USER_LEN >=
2628                   offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
2629
2630         kibnal_pack_connmsg((kib_msg_t *)rep->PrivateData,
2631                             conn->ibc_version,
2632                             CM_REPLY_INFO_USER_LEN,
2633                             IBNAL_MSG_CONNACK,
2634                             conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
2635
2636         LASSERT (conn->ibc_cep == NULL);
2637         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2638
2639         frc = iba_cm_accept(cep, 
2640                             &conn->ibc_cvars->cv_cmci,
2641                             NULL,
2642                             kibnal_cm_passive_callback, conn, 
2643                             &conn->ibc_cep);
2644
2645         if (frc == FSUCCESS || frc == FPENDING)
2646                 return;
2647         
2648         CERROR("iba_cm_accept(%s) failed: %d\n", 
2649                libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2650         kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ECONNABORTED);
2651 }
2652
2653 void
2654 kibnal_check_connreply(kib_conn_t *conn, CM_REPLY_INFO *rep)
2655 {
2656         kib_msg_t   *msg = (kib_msg_t *)rep->PrivateData;
2657         lnet_nid_t   nid = conn->ibc_peer->ibp_nid;
2658         FSTATUS      frc;
2659         int          rc;
2660
2661         rc = kibnal_unpack_msg(msg, conn->ibc_version, CM_REPLY_INFO_USER_LEN);
2662         if (rc != 0) {
2663                 CERROR ("Error %d unpacking connack from %s\n",
2664                         rc, libcfs_nid2str(nid));
2665                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2666                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2667                 return;
2668         }
2669                         
2670         if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2671                 CERROR("Bad connack request type %d (%d expected) from %s\n",
2672                        msg->ibm_type, IBNAL_MSG_CONNREQ,
2673                        libcfs_nid2str(msg->ibm_srcnid));
2674                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2675                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2676                 return;
2677         }
2678
2679         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
2680             msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid ||
2681             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2682                 CERROR("Stale connack from %s(%s): %s(%s), "LPX64"("LPX64")\n",
2683                        libcfs_nid2str(msg->ibm_srcnid), 
2684                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
2685                        libcfs_nid2str(msg->ibm_dstnid),
2686                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid),
2687                        msg->ibm_dststamp, kibnal_data.kib_incarnation);
2688                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2689                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ESTALE);
2690                 return;
2691         }
2692         
2693         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE ||
2694             msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE ||
2695             msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2696                 CERROR("Reject %s: q %d sz %d frag %d, (%d %d %d expected)\n",
2697                        libcfs_nid2str(msg->ibm_srcnid), 
2698                        msg->ibm_u.connparams.ibcp_queue_depth,
2699                        msg->ibm_u.connparams.ibcp_max_msg_size,
2700                        msg->ibm_u.connparams.ibcp_max_frags,
2701                        IBNAL_MSG_QUEUE_SIZE,
2702                        IBNAL_MSG_SIZE,
2703                        IBNAL_MAX_RDMA_FRAGS);
2704                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2705                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2706                 return;
2707         }
2708                         
2709         CDEBUG(D_NET, "Connection %s REP_RECEIVED.\n",
2710                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2711
2712         conn->ibc_incarnation = msg->ibm_srcstamp;
2713         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2714         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2715         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2716                  <= IBNAL_RX_MSGS);
2717
2718         rc = kibnal_conn_rts(conn, 
2719                              rep->QPN,
2720                              rep->ArbInitiatorDepth,
2721                              rep->ArbResponderResources,
2722                              rep->StartingPSN);
2723         if (rc != 0) {
2724                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_NO_RESOURCES);
2725                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EIO);
2726                 return;
2727         }
2728
2729         memset(&conn->ibc_cvars->cv_cmci, 0, sizeof(conn->ibc_cvars->cv_cmci));
2730         
2731         frc = iba_cm_accept(conn->ibc_cep, 
2732                             &conn->ibc_cvars->cv_cmci, 
2733                             NULL, NULL, NULL, NULL);
2734
2735         if (frc == FCM_CONNECT_ESTABLISHED) {
2736                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, 0);
2737                 return;
2738         }
2739         
2740         CERROR("Connection %s CMAccept failed: %d\n",
2741                libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2742         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ECONNABORTED);
2743 }
2744
2745 void
2746 kibnal_cm_active_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2747 {
2748         kib_conn_t       *conn = arg;
2749
2750         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2751
2752         switch (info->Status) {
2753         default:
2754                 CERROR("unknown status %d on Connection %s\n", 
2755                        info->Status, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2756                 LBUG();
2757                 break;
2758
2759         case FCM_CONNECT_TIMEOUT:
2760                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ETIMEDOUT);
2761                 break;
2762                 
2763         case FCM_CONNECT_REJECT:
2764                 kibnal_check_connreject(conn, IBNAL_CONN_ACTIVE,
2765                                         &info->Info.Reject);
2766                 break;
2767
2768         case FCM_CONNECT_REPLY:
2769                 kibnal_check_connreply(conn, &info->Info.Reply);
2770                 break;
2771
2772         case FCM_DISCONNECT_REQUEST:
2773         case FCM_DISCONNECT_REPLY:
2774         case FCM_DISCONNECTED:
2775                 kibnal_cm_disconnect_callback(conn, info);
2776                 break;
2777         }
2778 }
2779
2780 void
2781 dump_path_records(PATH_RESULTS *results)
2782 {
2783         IB_PATH_RECORD *path;
2784         int i;
2785
2786         for (i = 0; i < results->NumPathRecords; i++) {
2787                 path = &results->PathRecords[i];
2788                 CDEBUG(D_NET, "%d: sgid "LPX64":"LPX64" dgid "
2789                        LPX64":"LPX64" pkey %x\n",
2790                        i,
2791                        path->SGID.Type.Global.SubnetPrefix,
2792                        path->SGID.Type.Global.InterfaceID,
2793                        path->DGID.Type.Global.SubnetPrefix,
2794                        path->DGID.Type.Global.InterfaceID,
2795                        path->P_Key);
2796         }
2797 }
2798
2799 void
2800 kibnal_pathreq_callback (void *arg, QUERY *qry, 
2801                          QUERY_RESULT_VALUES *qrslt)
2802 {
2803         IB_CA_ATTRIBUTES  *ca_attr = &kibnal_data.kib_hca_attrs;
2804         kib_conn_t        *conn = arg;
2805         CM_REQUEST_INFO   *req = &conn->ibc_cvars->cv_cmci.Info.Request;
2806         PATH_RESULTS      *path = (PATH_RESULTS *)qrslt->QueryResult;
2807         FSTATUS            frc;
2808         
2809         if (qrslt->Status != FSUCCESS || 
2810             qrslt->ResultDataSize < sizeof(*path)) {
2811                 CDEBUG (D_NETERROR, "pathreq %s failed: status %d data size %d\n", 
2812                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
2813                         qrslt->Status, qrslt->ResultDataSize);
2814                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2815                 return;
2816         }
2817
2818         if (path->NumPathRecords < 1) {
2819                 CDEBUG (D_NETERROR, "pathreq %s failed: no path records\n",
2820                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
2821                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2822                 return;
2823         }
2824
2825         //dump_path_records(path);
2826         conn->ibc_cvars->cv_path = path->PathRecords[0];
2827
2828         LASSERT (conn->ibc_cep == NULL);
2829
2830         conn->ibc_cep = kibnal_create_cep(conn->ibc_peer->ibp_nid);
2831         if (conn->ibc_cep == NULL) {
2832                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ENOMEM);
2833                 return;
2834         }
2835
2836         memset(req, 0, sizeof(*req));
2837         req->SID                               = conn->ibc_cvars->cv_svcrec.RID.ServiceID;
2838         req->CEPInfo.CaGUID                    = kibnal_data.kib_hca_guids[kibnal_data.kib_hca_idx];
2839         req->CEPInfo.EndToEndFlowControl       = IBNAL_EE_FLOW;
2840         req->CEPInfo.PortGUID                  = conn->ibc_cvars->cv_path.SGID.Type.Global.InterfaceID;
2841         req->CEPInfo.RetryCount                = IBNAL_RETRY;
2842         req->CEPInfo.RnrRetryCount             = IBNAL_RNR_RETRY;
2843         req->CEPInfo.AckTimeout                = IBNAL_ACK_TIMEOUT;
2844         req->CEPInfo.StartingPSN               = IBNAL_STARTING_PSN;
2845         req->CEPInfo.QPN                       = conn->ibc_cvars->cv_qpattrs.QPNumber;
2846         req->CEPInfo.QKey                      = conn->ibc_cvars->cv_qpattrs.Qkey;
2847         req->CEPInfo.OfferedResponderResources = ca_attr->MaxQPResponderResources;
2848         req->CEPInfo.OfferedInitiatorDepth     = ca_attr->MaxQPInitiatorDepth;
2849         req->PathInfo.bSubnetLocal             = IBNAL_LOCAL_SUB;
2850         req->PathInfo.Path                     = conn->ibc_cvars->cv_path;
2851
2852         CLASSERT (CM_REQUEST_INFO_USER_LEN >=
2853                   offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
2854
2855         kibnal_pack_connmsg((kib_msg_t *)req->PrivateData, 
2856                             conn->ibc_version,
2857                             CM_REQUEST_INFO_USER_LEN,
2858                             IBNAL_MSG_CONNREQ, 
2859                             conn->ibc_peer->ibp_nid, 0);
2860
2861         if (the_lnet.ln_testprotocompat != 0) {
2862                 /* single-shot proto test */
2863                 LNET_LOCK();
2864                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
2865                         ((kib_msg_t *)req->PrivateData)->ibm_version++;
2866                         the_lnet.ln_testprotocompat &= ~1;
2867                 }
2868                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
2869                         ((kib_msg_t *)req->PrivateData)->ibm_magic =
2870                                 LNET_PROTO_MAGIC;
2871                         the_lnet.ln_testprotocompat &= ~2;
2872                 }
2873                 LNET_UNLOCK();
2874         }
2875
2876         /* Flag I'm getting involved with the CM... */
2877         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2878
2879         /* cm callback gets my conn ref */
2880         frc = iba_cm_connect(conn->ibc_cep, req, 
2881                              kibnal_cm_active_callback, conn);
2882         if (frc == FPENDING || frc == FSUCCESS)
2883                 return;
2884         
2885         CERROR ("Connect %s failed: %d\n", 
2886                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2887         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2888 }
2889
2890 void
2891 kibnal_dump_service_records(SERVICE_RECORD_RESULTS *results)
2892 {
2893         IB_SERVICE_RECORD *svc;
2894         int i;
2895
2896         for (i = 0; i < results->NumServiceRecords; i++) {
2897                 svc = &results->ServiceRecords[i];
2898                 CDEBUG(D_NET, "%d: sid "LPX64" gid "LPX64":"LPX64" pkey %x\n",
2899                        i,
2900                        svc->RID.ServiceID,
2901                        svc->RID.ServiceGID.Type.Global.SubnetPrefix,
2902                        svc->RID.ServiceGID.Type.Global.InterfaceID,
2903                        svc->RID.ServiceP_Key);
2904         }
2905 }
2906
2907 void
2908 kibnal_service_get_callback (void *arg, QUERY *qry, 
2909                              QUERY_RESULT_VALUES *qrslt)
2910 {
2911         kib_conn_t              *conn = arg;
2912         SERVICE_RECORD_RESULTS  *svc;
2913         FSTATUS                  frc;
2914
2915         if (qrslt->Status != FSUCCESS || 
2916             qrslt->ResultDataSize < sizeof(*svc)) {
2917                 CDEBUG (D_NETERROR, "Lookup %s failed: status %d data size %d\n", 
2918                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
2919                         qrslt->Status, qrslt->ResultDataSize);
2920                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2921                 return;
2922         }
2923
2924         svc = (SERVICE_RECORD_RESULTS *)qrslt->QueryResult;
2925         if (svc->NumServiceRecords < 1) {
2926                 CDEBUG (D_NETERROR, "lookup %s failed: no service records\n",
2927                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
2928                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2929                 return;
2930         }
2931
2932         //kibnal_dump_service_records(svc);
2933         conn->ibc_cvars->cv_svcrec = svc->ServiceRecords[0];
2934
2935         qry = &conn->ibc_cvars->cv_query;
2936         memset(qry, 0, sizeof(*qry));
2937
2938         qry->OutputType = OutputTypePathRecord;
2939         qry->InputType = InputTypePortGuidPair;
2940
2941         qry->InputValue.PortGuidPair.SourcePortGuid = 
2942                 kibnal_data.kib_port_guid;
2943         qry->InputValue.PortGuidPair.DestPortGuid  = 
2944                 conn->ibc_cvars->cv_svcrec.RID.ServiceGID.Type.Global.InterfaceID;
2945
2946         /* kibnal_pathreq_callback gets my conn ref */
2947         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd,
2948                                             kibnal_data.kib_port_guid,
2949                                             qry, 
2950                                             kibnal_pathreq_callback,
2951                                             &kibnal_data.kib_sdretry,
2952                                             conn);
2953         if (frc == FPENDING)
2954                 return;
2955
2956         CERROR ("pathreq %s failed: %d\n", 
2957                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2958         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2959 }
2960
2961 void
2962 kibnal_connect_peer (kib_peer_t *peer)
2963 {
2964         QUERY                     *qry;
2965         FSTATUS                    frc;
2966         kib_conn_t                *conn;
2967
2968         LASSERT (peer->ibp_connecting != 0);
2969
2970         conn = kibnal_create_conn(peer->ibp_nid, peer->ibp_version);
2971         if (conn == NULL) {
2972                 CERROR ("Can't allocate conn\n");
2973                 kibnal_peer_connect_failed(peer, IBNAL_CONN_ACTIVE, -ENOMEM);
2974                 return;
2975         }
2976
2977         conn->ibc_peer = peer;
2978         kibnal_peer_addref(peer);
2979
2980         qry = &conn->ibc_cvars->cv_query;
2981         memset(qry, 0, sizeof(*qry));
2982
2983         qry->OutputType = OutputTypeServiceRecord;
2984         qry->InputType = InputTypeServiceRecord;
2985
2986         qry->InputValue.ServiceRecordValue.ComponentMask = 
2987                 KIBNAL_SERVICE_KEY_MASK;
2988         kibnal_set_service_keys(
2989                 &qry->InputValue.ServiceRecordValue.ServiceRecord, 
2990                 peer->ibp_nid);
2991
2992         /* kibnal_service_get_callback gets my conn ref */
2993         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd,
2994                                             kibnal_data.kib_port_guid,
2995                                             qry,
2996                                             kibnal_service_get_callback,
2997                                             &kibnal_data.kib_sdretry, 
2998                                             conn);
2999         if (frc == FPENDING)
3000                 return;
3001
3002         CERROR("Lookup %s failed: %d\n", libcfs_nid2str(peer->ibp_nid), frc);
3003         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
3004 }
3005
3006 int
3007 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
3008 {
3009         kib_tx_t          *tx;
3010         struct list_head  *ttmp;
3011         int                timed_out = 0;
3012
3013         spin_lock(&conn->ibc_lock);
3014
3015         list_for_each (ttmp, txs) {
3016                 tx = list_entry (ttmp, kib_tx_t, tx_list);
3017
3018                 if (txs == &conn->ibc_active_txs) {
3019                         LASSERT (!tx->tx_queued);
3020                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
3021                 } else {
3022                         LASSERT (tx->tx_queued);
3023                 }
3024
3025                 if (time_after_eq (jiffies, tx->tx_deadline)) {
3026                         timed_out = 1;
3027                         break;
3028                 }
3029         }
3030
3031         spin_unlock(&conn->ibc_lock);
3032         return timed_out;
3033 }
3034
3035 int
3036 kibnal_conn_timed_out (kib_conn_t *conn)
3037 {
3038         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
3039                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
3040                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
3041                 kibnal_check_txs(conn, &conn->ibc_active_txs);
3042 }
3043
3044 void
3045 kibnal_check_peers (int idx)
3046 {
3047         rwlock_t          *rwlock = &kibnal_data.kib_global_lock;
3048         struct list_head  *peers = &kibnal_data.kib_peers[idx];
3049         struct list_head  *ptmp;
3050         kib_peer_t        *peer;
3051         kib_conn_t        *conn;
3052         struct list_head  *ctmp;
3053         unsigned long      flags;
3054
3055  again:
3056         /* NB. We expect to have a look at all the peers and not find any
3057          * rdmas to time out, so we just use a shared lock while we
3058          * take a look... */
3059         read_lock_irqsave(rwlock, flags);
3060
3061         list_for_each (ptmp, peers) {
3062                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
3063
3064                 if (peer->ibp_passivewait) {
3065                         LASSERT (list_empty(&peer->ibp_conns));
3066                         
3067                         if (!time_after_eq(jiffies, 
3068                                            peer->ibp_passivewait_deadline))
3069                                 continue;
3070                         
3071                         kibnal_peer_addref(peer); /* ++ ref for me... */
3072                         read_unlock_irqrestore(rwlock, flags);
3073
3074                         kibnal_peer_connect_failed(peer, IBNAL_CONN_WAITING,
3075                                                    -ETIMEDOUT);
3076                         kibnal_peer_decref(peer); /* ...until here */
3077                         
3078                         /* start again now I've dropped the lock */
3079                         goto again;
3080                 }
3081
3082                 list_for_each (ctmp, &peer->ibp_conns) {
3083                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
3084
3085                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
3086
3087                         /* In case we have enough credits to return via a
3088                          * NOOP, but there were no non-blocking tx descs
3089                          * free to do it last time... */
3090                         kibnal_check_sends(conn);
3091
3092                         if (!kibnal_conn_timed_out(conn))
3093                                 continue;
3094
3095                         /* Handle timeout by closing the whole connection.  We
3096                          * can only be sure RDMA activity has ceased once the
3097                          * QP has been modified. */
3098                         
3099                         kibnal_conn_addref(conn); /* 1 ref for me... */
3100
3101                         read_unlock_irqrestore(rwlock, flags);
3102
3103                         CERROR("Timed out RDMA with %s\n",
3104                                libcfs_nid2str(peer->ibp_nid));
3105
3106                         kibnal_close_conn (conn, -ETIMEDOUT);
3107                         kibnal_conn_decref(conn); /* ...until here */
3108
3109                         /* start again now I've dropped the lock */
3110                         goto again;
3111                 }
3112         }
3113
3114         read_unlock_irqrestore(rwlock, flags);
3115 }
3116
3117 void
3118 kibnal_disconnect_conn (kib_conn_t *conn)
3119 {
3120         FSTATUS       frc;
3121
3122         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECTING);
3123
3124         kibnal_conn_disconnected(conn);
3125                 
3126         frc = iba_cm_disconnect(conn->ibc_cep, NULL, NULL);
3127         switch (frc) {
3128         case FSUCCESS:
3129                 break;
3130                 
3131         case FINSUFFICIENT_RESOURCES:
3132                 CERROR("ENOMEM disconnecting %s\n",
3133                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3134                 /* This might cause the module to become unloadable since the
3135                  * FCM_DISCONNECTED callback is still outstanding */
3136                 break;
3137                 
3138         default:
3139                 CERROR("Unexpected error disconnecting %s: %d\n",
3140                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
3141                 LBUG();
3142         }
3143
3144         kibnal_peer_notify(conn->ibc_peer);
3145 }
3146
3147 int
3148 kibnal_connd (void *arg)
3149 {
3150         wait_queue_t       wait;
3151         unsigned long      flags;
3152         kib_conn_t        *conn;
3153         kib_peer_t        *peer;
3154         int                timeout;
3155         int                i;
3156         int                did_something;
3157         int                peer_index = 0;
3158         unsigned long      deadline = jiffies;
3159         
3160         cfs_daemonize ("kibnal_connd");
3161         cfs_block_allsigs ();
3162
3163         init_waitqueue_entry (&wait, current);
3164
3165         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3166
3167         while (!kibnal_data.kib_shutdown) {
3168                 did_something = 0;
3169
3170                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
3171                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
3172                                            kib_conn_t, ibc_list);
3173                         list_del (&conn->ibc_list);
3174                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3175                         did_something = 1;
3176
3177                         kibnal_destroy_conn(conn);
3178
3179                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3180                 }
3181
3182                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3183                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3184                                            kib_conn_t, ibc_list);
3185                         list_del (&conn->ibc_list);
3186                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3187                         did_something = 1;
3188
3189                         kibnal_disconnect_conn(conn);
3190                         kibnal_conn_decref(conn);
3191                         
3192                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3193                 }
3194
3195                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
3196                         peer = list_entry (kibnal_data.kib_connd_peers.next,
3197                                            kib_peer_t, ibp_connd_list);
3198                         
3199                         list_del_init (&peer->ibp_connd_list);
3200                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3201                         did_something = 1;
3202
3203                         kibnal_connect_peer (peer);
3204                         kibnal_peer_decref (peer);
3205
3206                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3207                 }
3208
3209                 /* careful with the jiffy wrap... */
3210                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
3211                         const int n = 4;
3212                         const int p = 1;
3213                         int       chunk = kibnal_data.kib_peer_hash_size;
3214                         
3215                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3216
3217                         /* Time to check for RDMA timeouts on a few more
3218                          * peers: I do checks every 'p' seconds on a
3219                          * proportion of the peer table and I need to check
3220                          * every connection 'n' times within a timeout
3221                          * interval, to ensure I detect a timeout on any
3222                          * connection within (n+1)/n times the timeout
3223                          * interval. */
3224
3225                         if (*kibnal_tunables.kib_timeout > n * p)
3226                                 chunk = (chunk * n * p) / 
3227                                         *kibnal_tunables.kib_timeout;
3228                         if (chunk == 0)
3229                                 chunk = 1;
3230
3231                         for (i = 0; i < chunk; i++) {
3232                                 kibnal_check_peers (peer_index);
3233                                 peer_index = (peer_index + 1) % 
3234                                              kibnal_data.kib_peer_hash_size;
3235                         }
3236
3237                         deadline += p * HZ;
3238                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3239                         did_something = 1;
3240                 }
3241
3242                 if (did_something)
3243                         continue;
3244
3245                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3246
3247                 set_current_state (TASK_INTERRUPTIBLE);
3248                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3249
3250                 if (!kibnal_data.kib_shutdown &&
3251                     list_empty (&kibnal_data.kib_connd_conns) &&
3252                     list_empty (&kibnal_data.kib_connd_peers))
3253                         schedule_timeout (timeout);
3254
3255                 set_current_state (TASK_RUNNING);
3256                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3257
3258                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3259         }
3260
3261         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3262
3263         kibnal_thread_fini ();
3264         return (0);
3265 }
3266
3267
3268 void 
3269 kibnal_hca_async_callback (void *hca_arg, IB_EVENT_RECORD *ev)
3270 {
3271         /* XXX flesh out.  this seems largely for async errors */
3272         CERROR("type: %d code: %u\n", ev->EventType, ev->EventCode);
3273 }
3274
3275 void
3276 kibnal_hca_callback (void *hca_arg, void *cq_arg)
3277 {
3278         unsigned long flags;
3279
3280         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3281         kibnal_data.kib_ready = 1;
3282         wake_up(&kibnal_data.kib_sched_waitq);
3283         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3284 }
3285
3286 int
3287 kibnal_scheduler(void *arg)
3288 {
3289         long               id = (long)arg;
3290         wait_queue_t       wait;
3291         char               name[16];
3292         FSTATUS            frc;
3293         FSTATUS            frc2;
3294         IB_WORK_COMPLETION wc;
3295         kib_rx_t          *rx;
3296         unsigned long      flags;
3297         __u64              rxseq = 0;
3298         int                busy_loops = 0;
3299
3300         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3301         cfs_daemonize(name);
3302         cfs_block_allsigs();
3303
3304         init_waitqueue_entry(&wait, current);
3305
3306         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3307
3308         while (!kibnal_data.kib_shutdown) {
3309                 if (busy_loops++ >= IBNAL_RESCHED) {
3310                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3311                                                flags);
3312
3313                         cfs_cond_resched();
3314                         busy_loops = 0;
3315                         
3316                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3317                 }
3318
3319                 if (kibnal_data.kib_ready &&
3320                     !kibnal_data.kib_checking_cq) {
3321                         /* take ownership of completion polling */
3322                         kibnal_data.kib_checking_cq = 1;
3323                         /* Assume I'll exhaust the CQ */
3324                         kibnal_data.kib_ready = 0;
3325                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3326                                                flags);
3327                         
3328                         frc = iba_poll_cq(kibnal_data.kib_cq, &wc);
3329                         if (frc == FNOT_DONE) {
3330                                 /* CQ empty */
3331                                 frc2 = iba_rearm_cq(kibnal_data.kib_cq,
3332                                                     CQEventSelNextWC);
3333                                 LASSERT (frc2 == FSUCCESS);
3334                         }
3335                         
3336                         if (frc == FSUCCESS &&
3337                             kibnal_wreqid2type(wc.WorkReqId) == IBNAL_WID_RX) {
3338                                 rx = (kib_rx_t *)kibnal_wreqid2ptr(wc.WorkReqId);
3339                                 
3340                                 /* Grab the RX sequence number NOW before
3341                                  * anyone else can get an RX completion */
3342                                 rxseq = rx->rx_conn->ibc_rxseq++;
3343                         }
3344                                 
3345                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3346                         /* give up ownership of completion polling */
3347                         kibnal_data.kib_checking_cq = 0;
3348
3349                         if (frc == FNOT_DONE)
3350                                 continue;
3351
3352                         LASSERT (frc == FSUCCESS);
3353                         /* Assume there's more: get another scheduler to check
3354                          * while I handle this completion... */
3355
3356                         kibnal_data.kib_ready = 1;
3357                         wake_up(&kibnal_data.kib_sched_waitq);
3358
3359                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3360                                                flags);
3361
3362                         switch (kibnal_wreqid2type(wc.WorkReqId)) {
3363                         case IBNAL_WID_RX:
3364                                 kibnal_rx_complete(&wc, rxseq);
3365                                 break;
3366                                 
3367                         case IBNAL_WID_TX:
3368                                 kibnal_tx_complete(&wc);
3369                                 break;
3370                                 
3371                         case IBNAL_WID_RDMA:
3372                                 /* We only get RDMA completion notification if
3373                                  * it fails.  So we just ignore them completely
3374                                  * because...
3375                                  *
3376                                  * 1) If an RDMA fails, all subsequent work
3377                                  * items, including the final SEND will fail
3378                                  * too, so I'm still guaranteed to notice that
3379                                  * this connection is hosed.
3380                                  *
3381                                  * 2) It's positively dangerous to look inside
3382                                  * the tx descriptor obtained from an RDMA work
3383                                  * item.  As soon as I drop the kib_sched_lock,
3384                                  * I give a scheduler on another CPU a chance
3385                                  * to get the final SEND completion, so the tx
3386                                  * descriptor can get freed as I inspect it. */
3387                                 CERROR ("RDMA failed: %d\n", wc.Status);
3388                                 break;
3389
3390                         default:
3391                                 LBUG();
3392                         }
3393                         
3394                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3395                         continue;
3396                 }
3397
3398                 /* Nothing to do; sleep... */
3399
3400                 set_current_state(TASK_INTERRUPTIBLE);
3401                 add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
3402                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3403                                        flags);
3404
3405                 schedule();
3406
3407                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3408                 set_current_state(TASK_RUNNING);
3409                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3410         }
3411
3412         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3413
3414         kibnal_thread_fini();
3415         return (0);
3416 }