Whamcloud - gitweb
This update includes Hex error ID's and checksum calculation for console error messages.
[fs/lustre-release.git] / lnet / klnds / iiblnd / iiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "iiblnd.h"
25
26 void
27 hexdump(char *string, void *ptr, int len)
28 {
29         unsigned char *c = ptr;
30         int i;
31
32         return;
33
34         if (len < 0 || len > 2048)  {
35                 printk("XXX what the hell? %d\n",len);
36                 return;
37         }
38
39         printk("%d bytes of '%s' from 0x%p\n", len, string, ptr);
40
41         for (i = 0; i < len;) {
42                 printk("%02x",*(c++));
43                 i++;
44                 if (!(i & 15)) {
45                         printk("\n");
46                 } else if (!(i&1)) {
47                         printk(" ");
48                 }
49         }
50
51         if(len & 15) {
52                 printk("\n");
53         }
54 }
55
56 void
57 kibnal_tx_done (kib_tx_t *tx)
58 {
59         lnet_msg_t *lntmsg[2];
60         int         rc = tx->tx_status;
61         int         i;
62
63         LASSERT (!in_interrupt());
64         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
65         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
66         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
67
68 #if IBNAL_USE_FMR
69         /* Handle unmapping if required */
70 #endif
71         /* tx may have up to 2 lnet msgs to finalise */
72         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
73         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
74         
75         if (tx->tx_conn != NULL) {
76                 kibnal_conn_decref(tx->tx_conn);
77                 tx->tx_conn = NULL;
78         }
79
80         tx->tx_nwrq = 0;
81         tx->tx_status = 0;
82
83         spin_lock(&kibnal_data.kib_tx_lock);
84
85         list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
86
87         spin_unlock(&kibnal_data.kib_tx_lock);
88
89         /* delay finalize until my descs have been freed */
90         for (i = 0; i < 2; i++) {
91                 if (lntmsg[i] == NULL)
92                         continue;
93
94                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
95         }
96 }
97
98 kib_tx_t *
99 kibnal_get_idle_tx (void) 
100 {
101         kib_tx_t      *tx;
102         
103         spin_lock(&kibnal_data.kib_tx_lock);
104
105         if (list_empty (&kibnal_data.kib_idle_txs)) {
106                 spin_unlock(&kibnal_data.kib_tx_lock);
107                 return NULL;
108         }
109
110         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
111         list_del (&tx->tx_list);
112
113         /* Allocate a new completion cookie.  It might not be needed,
114          * but we've got a lock right now and we're unlikely to
115          * wrap... */
116         tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
117
118         spin_unlock(&kibnal_data.kib_tx_lock);
119
120         LASSERT (tx->tx_nwrq == 0);
121         LASSERT (!tx->tx_queued);
122         LASSERT (tx->tx_sending == 0);
123         LASSERT (!tx->tx_waiting);
124         LASSERT (tx->tx_status == 0);
125         LASSERT (tx->tx_conn == NULL);
126         LASSERT (tx->tx_lntmsg[0] == NULL);
127         LASSERT (tx->tx_lntmsg[1] == NULL);
128         
129         return tx;
130 }
131
132 int
133 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
134 {
135         kib_conn_t   *conn = rx->rx_conn;
136         int           rc = 0;
137         FSTATUS       frc;
138
139         LASSERT (!in_interrupt());
140         /* old peers don't reserve rxs for RDMA replies */
141         LASSERT (!rsrvd_credit ||
142                  conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
143         
144         rx->rx_gl = (IB_LOCAL_DATASEGMENT) {
145                 .Address = rx->rx_hca_msg,
146                 .Lkey    = kibnal_data.kib_whole_mem.md_lkey,
147                 .Length  = IBNAL_MSG_SIZE,
148         };
149
150         rx->rx_wrq = (IB_WORK_REQ2) {
151                 .Next          = NULL,
152                 .WorkReqId     = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
153                 .MessageLen    = IBNAL_MSG_SIZE,
154                 .DSList        = &rx->rx_gl,
155                 .DSListDepth   = 1,
156                 .Operation     = WROpRecv,
157         };
158
159         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
160         LASSERT (rx->rx_nob >= 0);              /* not posted */
161
162         CDEBUG(D_NET, "posting rx [%d %x "LPX64"]\n", 
163                rx->rx_wrq.DSList->Length,
164                rx->rx_wrq.DSList->Lkey,
165                rx->rx_wrq.DSList->Address);
166
167         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
168                 /* No more posts for this rx; so lose its ref */
169                 kibnal_conn_decref(conn);
170                 return 0;
171         }
172         
173         rx->rx_nob = -1;                        /* flag posted */
174         mb();
175
176         frc = iba_post_recv2(conn->ibc_qp, &rx->rx_wrq, NULL);
177         if (frc == FSUCCESS) {
178                 if (credit || rsrvd_credit) {
179                         spin_lock(&conn->ibc_lock);
180
181                         if (credit)
182                                 conn->ibc_outstanding_credits++;
183                         if (rsrvd_credit)
184                                 conn->ibc_reserved_credits++;
185
186                         spin_unlock(&conn->ibc_lock);
187
188                         kibnal_check_sends(conn);
189                 }
190                 return 0;
191         }
192         
193         CERROR ("post rx -> %s failed %d\n", 
194                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
195         rc = -EIO;
196         kibnal_close_conn(rx->rx_conn, rc);
197         /* No more posts for this rx; so lose its ref */
198         kibnal_conn_decref(conn);
199         return rc;
200 }
201
202 int
203 kibnal_post_receives (kib_conn_t *conn)
204 {
205         int    i;
206         int    rc;
207
208         LASSERT (conn->ibc_state == IBNAL_CONN_CONNECTING);
209
210         for (i = 0; i < IBNAL_RX_MSGS; i++) {
211                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
212                  * fails (i.e. actual failure or we're disconnecting) */
213                 kibnal_conn_addref(conn);
214                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
215                 if (rc != 0)
216                         return rc;
217         }
218
219         return 0;
220 }
221
222 kib_tx_t *
223 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
224 {
225         struct list_head   *tmp;
226         
227         list_for_each(tmp, &conn->ibc_active_txs) {
228                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
229                 
230                 LASSERT (!tx->tx_queued);
231                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
232
233                 if (tx->tx_cookie != cookie)
234                         continue;
235
236                 if (tx->tx_waiting &&
237                     tx->tx_msg->ibm_type == txtype)
238                         return tx;
239
240                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
241                       tx->tx_waiting ? "" : "NOT ",
242                       tx->tx_msg->ibm_type, txtype);
243         }
244         return NULL;
245 }
246
247 void
248 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
249 {
250         kib_tx_t    *tx;
251         int          idle;
252
253         spin_lock(&conn->ibc_lock);
254
255         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
256         if (tx == NULL) {
257                 spin_unlock(&conn->ibc_lock);
258
259                 CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
260                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
261                 kibnal_close_conn (conn, -EPROTO);
262                 return;
263         }
264
265         if (tx->tx_status == 0) {               /* success so far */
266                 if (status < 0) {               /* failed? */
267                         tx->tx_status = status;
268                 } else if (txtype == IBNAL_MSG_GET_REQ) {
269                         lnet_set_reply_msg_len(kibnal_data.kib_ni,
270                                                tx->tx_lntmsg[1], status);
271                 }
272         }
273         
274         tx->tx_waiting = 0;
275
276         idle = !tx->tx_queued && (tx->tx_sending == 0);
277         if (idle)
278                 list_del(&tx->tx_list);
279
280         spin_unlock(&conn->ibc_lock);
281         
282         if (idle)
283                 kibnal_tx_done(tx);
284 }
285
286 void
287 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
288 {
289         kib_tx_t    *tx = kibnal_get_idle_tx();
290         
291         if (tx == NULL) {
292                 CERROR("Can't get tx for completion %x for %s\n",
293                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
294                 return;
295         }
296         
297         tx->tx_msg->ibm_u.completion.ibcm_status = status;
298         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
299         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
300         
301         kibnal_queue_tx(tx, conn);
302 }
303
304 void
305 kibnal_handle_rx (kib_rx_t *rx)
306 {
307         kib_msg_t    *msg = rx->rx_msg;
308         kib_conn_t   *conn = rx->rx_conn;
309         int           credits = msg->ibm_credits;
310         kib_tx_t     *tx;
311         int           rc = 0;
312         int           repost = 1;
313         int           rsrvd_credit = 0;
314         int           rc2;
315
316         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
317
318         CDEBUG (D_NET, "Received %x[%d] from %s\n",
319                 msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));
320         
321         if (credits != 0) {
322                 /* Have I received credits that will let me send? */
323                 spin_lock(&conn->ibc_lock);
324                 conn->ibc_credits += credits;
325                 spin_unlock(&conn->ibc_lock);
326
327                 kibnal_check_sends(conn);
328         }
329
330         switch (msg->ibm_type) {
331         default:
332                 CERROR("Bad IBNAL message type %x from %s\n",
333                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
334                 rc = -EPROTO;
335                 break;
336
337         case IBNAL_MSG_NOOP:
338                 break;
339
340         case IBNAL_MSG_IMMEDIATE:
341                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
342                                 msg->ibm_srcnid, rx, 0);
343                 repost = rc < 0;                /* repost on error */
344                 break;
345                 
346         case IBNAL_MSG_PUT_REQ:
347                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
348                                 msg->ibm_srcnid, rx, 1);
349                 repost = rc < 0;                /* repost on error */
350                 break;
351
352         case IBNAL_MSG_PUT_NAK:
353                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
354
355                 CWARN ("PUT_NACK from %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid));
356                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
357                                          msg->ibm_u.completion.ibcm_status,
358                                          msg->ibm_u.completion.ibcm_cookie);
359                 break;
360
361         case IBNAL_MSG_PUT_ACK:
362                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
363
364                 spin_lock(&conn->ibc_lock);
365                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
366                                                    msg->ibm_u.putack.ibpam_src_cookie);
367                 if (tx != NULL)
368                         list_del(&tx->tx_list);
369                 spin_unlock(&conn->ibc_lock);
370
371                 if (tx == NULL) {
372                         CERROR("Unmatched PUT_ACK from %s\n",
373                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
374                         rc = -EPROTO;
375                         break;
376                 }
377
378                 LASSERT (tx->tx_waiting);
379                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
380                  * (a) I can overwrite tx_msg since my peer has received it!
381                  * (b) tx_waiting set tells tx_complete() it's not done. */
382
383                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
384
385                 rc2 = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
386                                        kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
387                                        &msg->ibm_u.putack.ibpam_rd,
388                                        msg->ibm_u.putack.ibpam_dst_cookie);
389                 if (rc2 < 0)
390                         CERROR("Can't setup rdma for PUT to %s: %d\n",
391                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
392
393                 spin_lock(&conn->ibc_lock);
394                 if (tx->tx_status == 0 && rc2 < 0)
395                         tx->tx_status = rc2;
396                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
397                 kibnal_queue_tx_locked(tx, conn);
398                 spin_unlock(&conn->ibc_lock);
399                 break;
400                 
401         case IBNAL_MSG_PUT_DONE:
402                 /* This buffer was pre-reserved by not returning the credit
403                  * when the PUT_REQ's buffer was reposted, so I just return it
404                  * now */
405                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
406                                          msg->ibm_u.completion.ibcm_status,
407                                          msg->ibm_u.completion.ibcm_cookie);
408                 break;
409
410         case IBNAL_MSG_GET_REQ:
411                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
412                                 msg->ibm_srcnid, rx, 1);
413                 repost = rc < 0;                /* repost on error */
414                 break;
415
416         case IBNAL_MSG_GET_DONE:
417                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
418
419                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
420                                          msg->ibm_u.completion.ibcm_status,
421                                          msg->ibm_u.completion.ibcm_cookie);
422                 break;
423         }
424
425         if (rc < 0)                             /* protocol error */
426                 kibnal_close_conn(conn, rc);
427
428         if (repost) {
429                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
430                         rsrvd_credit = 0;       /* peer isn't pre-reserving */
431
432                 kibnal_post_rx(rx, !rsrvd_credit, rsrvd_credit);
433         }
434 }
435
436 void
437 kibnal_rx_complete (IB_WORK_COMPLETION *wc, __u64 rxseq)
438 {
439         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
440         int           nob = wc->Length;
441         kib_msg_t    *msg = rx->rx_msg;
442         kib_conn_t   *conn = rx->rx_conn;
443         unsigned long flags;
444         int           rc;
445         int           err = -EIO;
446
447         LASSERT (rx->rx_nob < 0);               /* was posted */
448         rx->rx_nob = 0;                         /* isn't now */
449         mb();
450
451         /* receives complete with error in any case after we've started
452          * disconnecting */
453         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
454                 goto ignore;
455
456         if (wc->Status != WRStatusSuccess) {
457                 CERROR("Rx from %s failed: %d\n", 
458                        libcfs_nid2str(conn->ibc_peer->ibp_nid), wc->Status);
459                 goto failed;
460         }
461
462         rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
463         if (rc != 0) {
464                 CERROR ("Error %d unpacking rx from %s\n",
465                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
466                 goto failed;
467         }
468
469         rx->rx_nob = nob;                       /* Now I know nob > 0 */
470         mb();
471
472         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
473             msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid ||
474             msg->ibm_srcstamp != conn->ibc_incarnation ||
475             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
476                 CERROR ("Stale rx from %s\n",
477                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
478                 err = -ESTALE;
479                 goto failed;
480         }
481
482         if (msg->ibm_seq != rxseq) {
483                 CERROR ("Out-of-sequence rx from %s"
484                         ": got "LPD64" but expected "LPD64"\n",
485                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
486                         msg->ibm_seq, rxseq);
487                 goto failed;
488         }
489
490         /* set time last known alive */
491         kibnal_peer_alive(conn->ibc_peer);
492
493         /* racing with connection establishment/teardown! */
494
495         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
496                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
497                 /* must check holding global lock to eliminate race */
498                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
499                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
500                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
501                                                 flags);
502                         return;
503                 }
504                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
505                                         flags);
506         }
507         kibnal_handle_rx(rx);
508         return;
509         
510  failed:
511         kibnal_close_conn(conn, err);
512  ignore:
513         /* Don't re-post rx & drop its ref on conn */
514         kibnal_conn_decref(conn);
515 }
516
517 struct page *
518 kibnal_kvaddr_to_page (unsigned long vaddr)
519 {
520         struct page *page;
521
522         if (vaddr >= VMALLOC_START &&
523             vaddr < VMALLOC_END) {
524                 page = vmalloc_to_page ((void *)vaddr);
525                 LASSERT (page != NULL);
526                 return page;
527         }
528 #if CONFIG_HIGHMEM
529         if (vaddr >= PKMAP_BASE &&
530             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
531                 /* No highmem pages only used for bulk (kiov) I/O */
532                 CERROR("find page for address in highmem\n");
533                 LBUG();
534         }
535 #endif
536         page = virt_to_page (vaddr);
537         LASSERT (page != NULL);
538         return page;
539 }
540
541 #if !IBNAL_USE_FMR
542 int
543 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
544                      unsigned long page_offset, unsigned long len)
545 {
546         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
547
548         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
549                 CERROR ("Too many RDMA fragments\n");
550                 return -EMSGSIZE;
551         }
552
553         if (active) {
554                 if (rd->rd_nfrag == 0)
555                         rd->rd_key = kibnal_data.kib_whole_mem.md_lkey;
556         } else {
557                 if (rd->rd_nfrag == 0)
558                         rd->rd_key = kibnal_data.kib_whole_mem.md_rkey;
559         }
560
561         frag->rf_nob  = len;
562         frag->rf_addr = kibnal_data.kib_whole_mem.md_addr +
563                         lnet_page2phys(page) + page_offset;
564
565         CDEBUG(D_NET,"map key %x frag [%d]["LPX64" for %d]\n", 
566                rd->rd_key, rd->rd_nfrag, frag->rf_addr, frag->rf_nob);
567
568         rd->rd_nfrag++;
569         return 0;
570 }
571
572 int
573 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
574                     unsigned int niov, struct iovec *iov, int offset, int nob)
575                  
576 {
577         int           fragnob;
578         int           rc;
579         unsigned long vaddr;
580         struct page  *page;
581         int           page_offset;
582
583         LASSERT (nob > 0);
584         LASSERT (niov > 0);
585         LASSERT ((rd != tx->tx_rd) == !active);
586
587         while (offset >= iov->iov_len) {
588                 offset -= iov->iov_len;
589                 niov--;
590                 iov++;
591                 LASSERT (niov > 0);
592         }
593
594         rd->rd_nfrag = 0;
595         do {
596                 LASSERT (niov > 0);
597
598                 vaddr = ((unsigned long)iov->iov_base) + offset;
599                 page_offset = vaddr & (PAGE_SIZE - 1);
600                 page = kibnal_kvaddr_to_page(vaddr);
601                 if (page == NULL) {
602                         CERROR ("Can't find page\n");
603                         return -EFAULT;
604                 }
605
606                 fragnob = min((int)(iov->iov_len - offset), nob);
607                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
608
609                 rc = kibnal_append_rdfrag(rd, active, page, 
610                                           page_offset, fragnob);
611                 if (rc != 0)
612                         return rc;
613
614                 if (offset + fragnob < iov->iov_len) {
615                         offset += fragnob;
616                 } else {
617                         offset = 0;
618                         iov++;
619                         niov--;
620                 }
621                 nob -= fragnob;
622         } while (nob > 0);
623         
624         return 0;
625 }
626
627 int
628 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
629                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
630 {
631         int            fragnob;
632         int            rc;
633
634         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
635
636         LASSERT (nob > 0);
637         LASSERT (nkiov > 0);
638         LASSERT ((rd != tx->tx_rd) == !active);
639
640         while (offset >= kiov->kiov_len) {
641                 offset -= kiov->kiov_len;
642                 nkiov--;
643                 kiov++;
644                 LASSERT (nkiov > 0);
645         }
646
647         rd->rd_nfrag = 0;
648         do {
649                 LASSERT (nkiov > 0);
650                 fragnob = min((int)(kiov->kiov_len - offset), nob);
651                 
652                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
653                                           kiov->kiov_offset + offset,
654                                           fragnob);
655                 if (rc != 0)
656                         return rc;
657
658                 offset = 0;
659                 kiov++;
660                 nkiov--;
661                 nob -= fragnob;
662         } while (nob > 0);
663
664         return 0;
665 }
666 #else
667 int
668 kibnal_map_tx (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
669                int npages, unsigned long page_offset, int nob)
670 {
671         IB_ACCESS_CONTROL access = {0,};
672         FSTATUS           frc;
673
674         LASSERT ((rd != tx->tx_rd) == !active);
675         LASSERT (!tx->tx_md.md_active);
676         LASSERT (tx->tx_md.md_fmrcount > 0);
677         LASSERT (page_offset < PAGE_SIZE);
678         LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
679         LASSERT (npages <= LNET_MAX_IOV);
680
681         if (!active) {
682                 // access.s.MWBindable = 1;
683                 access.s.LocalWrite = 1;
684                 access.s.RdmaWrite = 1;
685         }
686
687         /* Map the memory described by tx->tx_pages
688         frc = iibt_register_physical_memory(kibnal_data.kib_hca,
689                                             IBNAL_RDMA_BASE,
690                                             tx->tx_pages, npages,
691                                             page_offset,
692                                             kibnal_data.kib_pd,
693                                             access,
694                                             &tx->tx_md.md_handle,
695                                             &tx->tx_md.md_addr,
696                                             &tx->tx_md.md_lkey,
697                                             &tx->tx_md.md_rkey);
698         */
699         return -EINVAL;
700 }
701
702 int
703 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
704                      unsigned int niov, struct iovec *iov, int offset, int nob)
705                  
706 {
707         int           resid;
708         int           fragnob;
709         struct page  *page;
710         int           npages;
711         unsigned long page_offset;
712         unsigned long vaddr;
713
714         LASSERT (nob > 0);
715         LASSERT (niov > 0);
716
717         while (offset >= iov->iov_len) {
718                 offset -= iov->iov_len;
719                 niov--;
720                 iov++;
721                 LASSERT (niov > 0);
722         }
723
724         if (nob > iov->iov_len - offset) {
725                 CERROR ("Can't map multiple vaddr fragments\n");
726                 return (-EMSGSIZE);
727         }
728
729         vaddr = ((unsigned long)iov->iov_base) + offset;
730         
731         page_offset = vaddr & (PAGE_SIZE - 1);
732         resid = nob;
733         npages = 0;
734
735         do {
736                 LASSERT (npages < LNET_MAX_IOV);
737
738                 page = kibnal_kvaddr_to_page(vaddr);
739                 if (page == NULL) {
740                         CERROR("Can't find page for %lu\n", vaddr);
741                         return -EFAULT;
742                 }
743
744                 tx->tx_pages[npages++] = lnet_page2phys(page);
745
746                 fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
747                 vaddr += fragnob;
748                 resid -= fragnob;
749
750         } while (resid > 0);
751
752         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
753 }
754
755 int
756 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
757                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
758 {
759         int            resid;
760         int            npages;
761         unsigned long  page_offset;
762         
763         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
764
765         LASSERT (nob > 0);
766         LASSERT (nkiov > 0);
767         LASSERT (nkiov <= LNET_MAX_IOV);
768         LASSERT (!tx->tx_md.md_active);
769         LASSERT ((rd != tx->tx_rd) == !active);
770
771         while (offset >= kiov->kiov_len) {
772                 offset -= kiov->kiov_len;
773                 nkiov--;
774                 kiov++;
775                 LASSERT (nkiov > 0);
776         }
777
778         page_offset = kiov->kiov_offset + offset;
779         
780         resid = offset + nob;
781         npages = 0;
782
783         do {
784                 LASSERT (npages < LNET_MAX_IOV);
785                 LASSERT (nkiov > 0);
786
787                 if ((npages > 0 && kiov->kiov_offset != 0) ||
788                     (resid > kiov->kiov_len && 
789                      (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
790                         /* Can't have gaps */
791                         CERROR ("Can't make payload contiguous in I/O VM:"
792                                 "page %d, offset %d, len %d \n",
793                                 npages, kiov->kiov_offset, kiov->kiov_len);
794                         
795                         return -EINVAL;
796                 }
797
798                 tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);
799                 resid -= kiov->kiov_len;
800                 kiov++;
801                 nkiov--;
802         } while (resid > 0);
803
804         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
805 }
806 #endif
807
808 kib_conn_t *
809 kibnal_find_conn_locked (kib_peer_t *peer)
810 {
811         struct list_head *tmp;
812
813         /* just return the first connection */
814         list_for_each (tmp, &peer->ibp_conns) {
815                 return (list_entry(tmp, kib_conn_t, ibc_list));
816         }
817
818         return (NULL);
819 }
820
821 void
822 kibnal_check_sends (kib_conn_t *conn)
823 {
824         kib_tx_t       *tx;
825         FSTATUS         frc;
826         int             rc;
827         int             consume_cred;
828         int             done;
829
830         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
831         
832         spin_lock(&conn->ibc_lock);
833
834         LASSERT (conn->ibc_nsends_posted <=
835                 *kibnal_tunables.kib_concurrent_sends);
836         LASSERT (conn->ibc_reserved_credits >= 0);
837         
838         while (conn->ibc_reserved_credits > 0 &&
839                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
840                 LASSERT (conn->ibc_version != 
841                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
842                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
843                                 kib_tx_t, tx_list);
844                 list_del(&tx->tx_list);
845                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
846                 conn->ibc_reserved_credits--;
847         }
848
849         if (list_empty(&conn->ibc_tx_queue) &&
850             list_empty(&conn->ibc_tx_queue_nocred) &&
851             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
852              kibnal_send_keepalive(conn))) {
853                 spin_unlock(&conn->ibc_lock);
854                 
855                 tx = kibnal_get_idle_tx();
856                 if (tx != NULL)
857                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
858
859                 spin_lock(&conn->ibc_lock);
860                 
861                 if (tx != NULL)
862                         kibnal_queue_tx_locked(tx, conn);
863         }
864
865         for (;;) {
866                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
867                         LASSERT (conn->ibc_version != 
868                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
869                         tx = list_entry (conn->ibc_tx_queue_nocred.next, 
870                                          kib_tx_t, tx_list);
871                         consume_cred = 0;
872                 } else if (!list_empty (&conn->ibc_tx_queue)) {
873                         tx = list_entry (conn->ibc_tx_queue.next, 
874                                          kib_tx_t, tx_list);
875                         consume_cred = 1;
876                 } else {
877                         /* nothing waiting */
878                         break;
879                 }
880
881                 LASSERT (tx->tx_queued);
882                 /* We rely on this for QP sizing */
883                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
884
885                 LASSERT (conn->ibc_outstanding_credits >= 0);
886                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
887                 LASSERT (conn->ibc_credits >= 0);
888                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
889
890                 if (conn->ibc_nsends_posted ==
891                     *kibnal_tunables.kib_concurrent_sends) {
892                         /* We've got some tx completions outstanding... */
893                         CDEBUG(D_NET, "%s: posted enough\n",
894                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
895                         break;
896                 }
897
898                 if (consume_cred) {
899                         if (conn->ibc_credits == 0) {   /* no credits */
900                                 CDEBUG(D_NET, "%s: no credits\n",
901                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
902                                 break;
903                         }
904                         
905                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
906                             conn->ibc_outstanding_credits == 0) { /* giving back credits */
907                                 CDEBUG(D_NET, "%s: not using last credit\n",
908                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
909                                 break;
910                         }
911                 }
912                 
913                 list_del (&tx->tx_list);
914                 tx->tx_queued = 0;
915
916                 /* NB don't drop ibc_lock before bumping tx_sending */
917
918                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
919                     (!list_empty(&conn->ibc_tx_queue) ||
920                      !list_empty(&conn->ibc_tx_queue_nocred) ||
921                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
922                       !kibnal_send_keepalive(conn)))) {
923                         /* redundant NOOP */
924                         spin_unlock(&conn->ibc_lock);
925                         kibnal_tx_done(tx);
926                         spin_lock(&conn->ibc_lock);
927                         CDEBUG(D_NET, "%s: redundant noop\n",
928                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
929                         continue;
930                 }
931
932                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
933                                 conn->ibc_outstanding_credits,
934                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
935                                 conn->ibc_txseq);
936
937                 conn->ibc_txseq++;
938                 conn->ibc_outstanding_credits = 0;
939                 conn->ibc_nsends_posted++;
940                 if (consume_cred)
941                         conn->ibc_credits--;
942
943                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
944                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
945                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
946                  * and then re-queued here.  It's (just) possible that
947                  * tx_sending is non-zero if we've not done the tx_complete() from
948                  * the first send; hence the ++ rather than = below. */
949                 tx->tx_sending++;
950
951                 list_add (&tx->tx_list, &conn->ibc_active_txs);
952
953                 LASSERT (tx->tx_nwrq > 0);
954
955                 rc = 0;
956                 frc = FSUCCESS;
957                 if (conn->ibc_state != IBNAL_CONN_ESTABLISHED) {
958                         rc = -ECONNABORTED;
959                 } else {
960                         frc = iba_post_send2(conn->ibc_qp, tx->tx_wrq, NULL);
961                         if (frc != FSUCCESS)
962                                 rc = -EIO;
963                 }
964
965                 conn->ibc_last_send = jiffies;
966
967                 if (rc != 0) {
968                         /* NB credits are transferred in the actual
969                          * message, which can only be the last work item */
970                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
971                         if (consume_cred)
972                                 conn->ibc_credits++;
973                         conn->ibc_nsends_posted--;
974
975                         tx->tx_status = rc;
976                         tx->tx_waiting = 0;
977                         tx->tx_sending--;
978                         
979                         done = (tx->tx_sending == 0);
980                         if (done)
981                                 list_del (&tx->tx_list);
982                         
983                         spin_unlock(&conn->ibc_lock);
984                         
985                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
986                                 CERROR ("Error %d posting transmit to %s\n", 
987                                         frc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
988                         else
989                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
990                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
991
992                         kibnal_close_conn (conn, rc);
993
994                         if (done)
995                                 kibnal_tx_done (tx);
996                         return;
997                 }
998         }
999
1000         spin_unlock(&conn->ibc_lock);
1001 }
1002
1003 void
1004 kibnal_tx_complete (IB_WORK_COMPLETION *wc)
1005 {
1006         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
1007         kib_conn_t   *conn = tx->tx_conn;
1008         int           failed = wc->Status != WRStatusSuccess;
1009         int           idle;
1010
1011         CDEBUG(D_NET, "%s: sending %d nwrq %d status %d\n", 
1012                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1013                tx->tx_sending, tx->tx_nwrq, wc->Status);
1014
1015         LASSERT (tx->tx_sending > 0);
1016
1017         if (failed &&
1018             tx->tx_status == 0 &&
1019             conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1020 #if KIBLND_DETAILED_DEBUG
1021                 int                   i;
1022                 IB_WORK_REQ2         *wrq = &tx->tx_wrq[0];
1023                 IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[0];
1024                 lnet_msg_t           *lntmsg = tx->tx_lntmsg[0];
1025 #endif
1026                 CDEBUG(D_NETERROR, "tx -> %s type %x cookie "LPX64
1027                        " sending %d waiting %d failed %d nwrk %d\n", 
1028                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
1029                        tx->tx_msg->ibm_type, tx->tx_cookie,
1030                        tx->tx_sending, tx->tx_waiting, wc->Status,
1031                        tx->tx_nwrq);
1032 #if KIBLND_DETAILED_DEBUG
1033                 for (i = 0; i < tx->tx_nwrq; i++, wrq++, gl++) {
1034                         switch (wrq->Operation) {
1035                         default:
1036                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p OP %d "
1037                                        "DSList %p(%p)/%d: "LPX64"/%d K %x\n",
1038                                        i, wrq, wrq->Next, wrq->Operation,
1039                                        wrq->DSList, gl, wrq->DSListDepth,
1040                                        gl->Address, gl->Length, gl->Lkey);
1041                                 break;
1042                         case WROpSend:
1043                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p SEND "
1044                                        "DSList %p(%p)/%d: "LPX64"/%d K %x\n",
1045                                        i, wrq, wrq->Next, 
1046                                        wrq->DSList, gl, wrq->DSListDepth,
1047                                        gl->Address, gl->Length, gl->Lkey);
1048                                 break;
1049                         case WROpRdmaWrite:
1050                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p DMA "
1051                                        "DSList: %p(%p)/%d "LPX64"/%d K %x -> "
1052                                        LPX64" K %x\n",
1053                                        i, wrq, wrq->Next, 
1054                                        wrq->DSList, gl, wrq->DSListDepth,
1055                                        gl->Address, gl->Length, gl->Lkey,
1056                                        wrq->Req.SendRC.RemoteDS.Address,
1057                                        wrq->Req.SendRC.RemoteDS.Rkey);
1058                                 break;
1059                         }
1060                 }
1061                 
1062                 switch (tx->tx_msg->ibm_type) {
1063                 default:
1064                         CDEBUG(D_NETERROR, "  msg type %x %p/%d, No RDMA\n", 
1065                                tx->tx_msg->ibm_type, 
1066                                tx->tx_msg, tx->tx_msg->ibm_nob);
1067                         break;
1068
1069                 case IBNAL_MSG_PUT_DONE:
1070                 case IBNAL_MSG_GET_DONE:
1071                         CDEBUG(D_NETERROR, "  msg type %x %p/%d, RDMA key %x frags %d...\n", 
1072                                tx->tx_msg->ibm_type, 
1073                                tx->tx_msg, tx->tx_msg->ibm_nob,
1074                                tx->tx_rd->rd_key, tx->tx_rd->rd_nfrag);
1075                         for (i = 0; i < tx->tx_rd->rd_nfrag; i++)
1076                                 CDEBUG(D_NETERROR, "    [%d] "LPX64"/%d\n", i,
1077                                        tx->tx_rd->rd_frags[i].rf_addr,
1078                                        tx->tx_rd->rd_frags[i].rf_nob);
1079                         if (lntmsg == NULL) {
1080                                 CDEBUG(D_NETERROR, "  No lntmsg\n");
1081                         } else if (lntmsg->msg_iov != NULL) {
1082                                 CDEBUG(D_NETERROR, "  lntmsg in %d VIRT frags...\n", 
1083                                        lntmsg->msg_niov);
1084                                 for (i = 0; i < lntmsg->msg_niov; i++)
1085                                         CDEBUG(D_NETERROR, "    [%d] %p/%d\n", i,
1086                                                lntmsg->msg_iov[i].iov_base,
1087                                                lntmsg->msg_iov[i].iov_len);
1088                         } else if (lntmsg->msg_kiov != NULL) {
1089                                 CDEBUG(D_NETERROR, "  lntmsg in %d PAGE frags...\n", 
1090                                        lntmsg->msg_niov);
1091                                 for (i = 0; i < lntmsg->msg_niov; i++)
1092                                         CDEBUG(D_NETERROR, "    [%d] %p+%d/%d\n", i,
1093                                                lntmsg->msg_kiov[i].kiov_page,
1094                                                lntmsg->msg_kiov[i].kiov_offset,
1095                                                lntmsg->msg_kiov[i].kiov_len);
1096                         } else {
1097                                 CDEBUG(D_NETERROR, "  lntmsg in %d frags\n", 
1098                                        lntmsg->msg_niov);
1099                         }
1100                         
1101                         break;
1102                 }
1103 #endif
1104         }
1105         
1106         spin_lock(&conn->ibc_lock);
1107
1108         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1109          * gets to free it, which also drops its ref on 'conn'. */
1110
1111         tx->tx_sending--;
1112         conn->ibc_nsends_posted--;
1113
1114         if (failed) {
1115                 tx->tx_waiting = 0;
1116                 tx->tx_status = -EIO;
1117         }
1118         
1119         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1120                !tx->tx_waiting &&               /* Not waiting for peer */
1121                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1122         if (idle)
1123                 list_del(&tx->tx_list);
1124
1125         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1126
1127         spin_unlock(&conn->ibc_lock);
1128
1129         if (idle)
1130                 kibnal_tx_done (tx);
1131
1132         if (failed) {
1133                 kibnal_close_conn (conn, -EIO);
1134         } else {
1135                 kibnal_peer_alive(conn->ibc_peer);
1136                 kibnal_check_sends(conn);
1137         }
1138
1139         kibnal_conn_decref(conn);               /* ...until here */
1140 }
1141
1142 void
1143 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1144 {
1145         IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[tx->tx_nwrq];
1146         IB_WORK_REQ2         *wrq = &tx->tx_wrq[tx->tx_nwrq];
1147         int                   nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1148
1149         LASSERT (tx->tx_nwrq >= 0 && 
1150                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1151         LASSERT (nob <= IBNAL_MSG_SIZE);
1152
1153         kibnal_init_msg(tx->tx_msg, type, body_nob);
1154
1155         *gl = (IB_LOCAL_DATASEGMENT) {
1156                 .Address = tx->tx_hca_msg,
1157                 .Length  = IBNAL_MSG_SIZE,
1158                 .Lkey    = kibnal_data.kib_whole_mem.md_lkey,
1159         };
1160
1161         wrq->Next           = NULL;             /* This is the last one */
1162
1163         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1164         wrq->Operation      = WROpSend;
1165         wrq->DSList         = gl;
1166         wrq->DSListDepth    = 1;
1167         wrq->MessageLen     = nob;
1168         wrq->Req.SendRC.ImmediateData  = 0;
1169         wrq->Req.SendRC.Options.s.SolicitedEvent         = 1;
1170         wrq->Req.SendRC.Options.s.SignaledCompletion     = 1;
1171         wrq->Req.SendRC.Options.s.ImmediateData          = 0;
1172         wrq->Req.SendRC.Options.s.Fence                  = 0; 
1173         /* fence only needed on RDMA reads */
1174         
1175         tx->tx_nwrq++;
1176 }
1177
1178 int
1179 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1180                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1181 {
1182         kib_msg_t            *ibmsg = tx->tx_msg;
1183         kib_rdma_desc_t      *srcrd = tx->tx_rd;
1184         IB_LOCAL_DATASEGMENT *gl;
1185         IB_WORK_REQ2         *wrq;
1186         int                   rc;
1187
1188 #if IBNAL_USE_FMR
1189         LASSERT (tx->tx_nwrq == 0);
1190
1191         gl = &tx->tx_gl[0];
1192         gl->Length  = nob;
1193         gl->Address = srcrd->rd_addr;
1194         gl->Lkey    = srcrd->rd_key;
1195
1196         wrq = &tx->tx_wrq[0];
1197
1198         wrq->Next           = wrq + 1;
1199         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1200         wrq->Operation      = WROpRdmaWrite;
1201         wrq->DSList         = gl;
1202         wrq->DSListDepth    = 1;
1203         wrq->MessageLen     = nob;
1204
1205         wrq->Req.SendRC.ImmediateData                = 0;
1206         wrq->Req.SendRC.Options.s.SolicitedEvent     = 0;
1207         wrq->Req.SendRC.Options.s.SignaledCompletion = 0;
1208         wrq->Req.SendRC.Options.s.ImmediateData      = 0;
1209         wrq->Req.SendRC.Options.s.Fence              = 0; 
1210
1211         wrq->Req.SendRC.RemoteDS.Address = dstrd->rd_addr;
1212         wrq->Req.SendRC.RemoteDS.Rkey    = dstrd->rd_key;
1213
1214         tx->tx_nwrq = 1;
1215         rc = nob;
1216 #else
1217         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1218         int              resid = nob;
1219         kib_rdma_frag_t *srcfrag;
1220         int              srcidx;
1221         kib_rdma_frag_t *dstfrag;
1222         int              dstidx;
1223         int              wrknob;
1224
1225         /* Called by scheduler */
1226         LASSERT (!in_interrupt());
1227
1228         LASSERT (type == IBNAL_MSG_GET_DONE ||
1229                  type == IBNAL_MSG_PUT_DONE);
1230
1231         srcidx = dstidx = 0;
1232         srcfrag = &srcrd->rd_frags[0];
1233         dstfrag = &dstrd->rd_frags[0];
1234         rc = resid;
1235
1236         while (resid > 0) {
1237                 if (srcidx >= srcrd->rd_nfrag) {
1238                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1239                         rc = -EPROTO;
1240                         break;
1241                 }
1242                 
1243                 if (dstidx == dstrd->rd_nfrag) {
1244                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1245                         rc = -EPROTO;
1246                         break;
1247                 }
1248
1249                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1250                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1251                                srcidx, srcrd->rd_nfrag,
1252                                dstidx, dstrd->rd_nfrag);
1253                         rc = -EMSGSIZE;
1254                         break;
1255                 }
1256
1257                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1258
1259                 gl = &tx->tx_gl[tx->tx_nwrq];
1260                 gl->Length  = wrknob;
1261                 gl->Address = srcfrag->rf_addr;
1262                 gl->Lkey    = srcrd->rd_key;
1263
1264                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1265
1266                 wrq->Next           = wrq + 1;
1267                 wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1268                 wrq->Operation      = WROpRdmaWrite;
1269                 wrq->DSList         = gl;
1270                 wrq->DSListDepth    = 1;
1271                 wrq->MessageLen     = nob;
1272
1273                 wrq->Req.SendRC.ImmediateData                = 0;
1274                 wrq->Req.SendRC.Options.s.SolicitedEvent     = 0;
1275                 wrq->Req.SendRC.Options.s.SignaledCompletion = 0;
1276                 wrq->Req.SendRC.Options.s.ImmediateData      = 0;
1277                 wrq->Req.SendRC.Options.s.Fence              = 0; 
1278
1279                 wrq->Req.SendRC.RemoteDS.Address = dstfrag->rf_addr;
1280                 wrq->Req.SendRC.RemoteDS.Rkey    = dstrd->rd_key;
1281
1282                 resid -= wrknob;
1283                 if (wrknob < srcfrag->rf_nob) {
1284                         srcfrag->rf_addr += wrknob;
1285                         srcfrag->rf_nob -= wrknob;
1286                 } else {
1287                         srcfrag++;
1288                         srcidx++;
1289                 }
1290                 
1291                 if (wrknob < dstfrag->rf_nob) {
1292                         dstfrag->rf_addr += wrknob;
1293                         dstfrag->rf_nob -= wrknob;
1294                 } else {
1295                         dstfrag++;
1296                         dstidx++;
1297                 }
1298                 
1299                 tx->tx_nwrq++;
1300         }
1301
1302         if (rc < 0)                             /* no RDMA if completing with failure */
1303                 tx->tx_nwrq = 0;
1304 #endif
1305         
1306         ibmsg->ibm_u.completion.ibcm_status = rc;
1307         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1308         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1309
1310         return rc;
1311 }
1312
1313 void
1314 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1315 {
1316         spin_lock(&conn->ibc_lock);
1317         kibnal_queue_tx_locked (tx, conn);
1318         spin_unlock(&conn->ibc_lock);
1319         
1320         kibnal_check_sends(conn);
1321 }
1322
1323 void
1324 kibnal_schedule_active_connect_locked (kib_peer_t *peer, int proto_version)
1325 {
1326         /* Called holding kib_global_lock exclusive with IRQs disabled */
1327
1328         peer->ibp_version = proto_version;      /* proto version for new conn */
1329         peer->ibp_connecting++;                 /* I'm connecting */
1330         kibnal_peer_addref(peer);               /* extra ref for connd */
1331
1332         spin_lock(&kibnal_data.kib_connd_lock);
1333
1334         list_add_tail (&peer->ibp_connd_list, &kibnal_data.kib_connd_peers);
1335         wake_up (&kibnal_data.kib_connd_waitq);
1336
1337         spin_unlock(&kibnal_data.kib_connd_lock);
1338 }
1339
1340 void
1341 kibnal_schedule_active_connect (kib_peer_t *peer, int proto_version)
1342 {
1343         unsigned long flags;
1344
1345         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1346
1347         kibnal_schedule_active_connect_locked(peer, proto_version);
1348
1349         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1350 }
1351
1352 void
1353 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
1354 {
1355         kib_peer_t      *peer;
1356         kib_conn_t      *conn;
1357         unsigned long    flags;
1358         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1359         int              retry;
1360         int              rc;
1361
1362         /* If I get here, I've committed to send, so I complete the tx with
1363          * failure on any problems */
1364         
1365         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1366         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1367
1368         for (retry = 0; ; retry = 1) {
1369                 read_lock_irqsave(g_lock, flags);
1370         
1371                 peer = kibnal_find_peer_locked (nid);
1372                 if (peer != NULL) {
1373                         conn = kibnal_find_conn_locked (peer);
1374                         if (conn != NULL) {
1375                                 kibnal_conn_addref(conn); /* 1 ref for me... */
1376                                 read_unlock_irqrestore(g_lock, flags);
1377
1378                                 kibnal_queue_tx (tx, conn);
1379                                 kibnal_conn_decref(conn); /* ...to here */
1380                                 return;
1381                         }
1382                 }
1383                 
1384                 /* Making one or more connections; I'll need a write lock... */
1385                 read_unlock(g_lock);
1386                 write_lock(g_lock);
1387
1388                 peer = kibnal_find_peer_locked (nid);
1389                 if (peer != NULL)
1390                         break;
1391
1392                 write_unlock_irqrestore(g_lock, flags);
1393
1394                 if (retry) {
1395                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
1396
1397                         tx->tx_status = -EHOSTUNREACH;
1398                         tx->tx_waiting = 0;
1399                         kibnal_tx_done (tx);
1400                         return;
1401                 }
1402
1403                 rc = kibnal_add_persistent_peer(nid);
1404                 if (rc != 0) {
1405                         CERROR("Can't add peer %s: %d\n",
1406                                libcfs_nid2str(nid), rc);
1407                         
1408                         tx->tx_status = -EHOSTUNREACH;
1409                         tx->tx_waiting = 0;
1410                         kibnal_tx_done (tx);
1411                         return;
1412                 }
1413         }
1414
1415         conn = kibnal_find_conn_locked (peer);
1416         if (conn != NULL) {
1417                 /* Connection exists; queue message on it */
1418                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1419                 write_unlock_irqrestore(g_lock, flags);
1420                 
1421                 kibnal_queue_tx (tx, conn);
1422                 kibnal_conn_decref(conn);       /* ...until here */
1423                 return;
1424         }
1425
1426         if (!kibnal_peer_connecting(peer)) {
1427                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1428                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1429                         write_unlock_irqrestore(g_lock, flags);
1430                         tx->tx_status = -EHOSTUNREACH;
1431                         tx->tx_waiting = 0;
1432                         kibnal_tx_done (tx);
1433                         return;
1434                 }
1435
1436                 kibnal_schedule_active_connect_locked(peer, IBNAL_MSG_VERSION);
1437         }
1438         
1439         /* A connection is being established; queue the message... */
1440         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1441
1442         write_unlock_irqrestore(g_lock, flags);
1443 }
1444
1445 void
1446 kibnal_txlist_done (struct list_head *txlist, int status)
1447 {
1448         kib_tx_t *tx;
1449
1450         while (!list_empty (txlist)) {
1451                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1452
1453                 list_del (&tx->tx_list);
1454                 /* complete now */
1455                 tx->tx_waiting = 0;
1456                 tx->tx_status = status;
1457                 kibnal_tx_done (tx);
1458         }
1459 }
1460
1461 int
1462 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1463 {
1464         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1465         int               type = lntmsg->msg_type; 
1466         lnet_process_id_t target = lntmsg->msg_target;
1467         int               target_is_router = lntmsg->msg_target_is_router;
1468         int               routing = lntmsg->msg_routing;
1469         unsigned int      payload_niov = lntmsg->msg_niov; 
1470         struct iovec     *payload_iov = lntmsg->msg_iov; 
1471         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1472         unsigned int      payload_offset = lntmsg->msg_offset;
1473         unsigned int      payload_nob = lntmsg->msg_len;
1474         kib_msg_t        *ibmsg;
1475         kib_tx_t         *tx;
1476         int               nob;
1477         int               rc;
1478
1479         /* NB 'private' is different depending on what we're sending.... */
1480
1481         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1482                payload_nob, payload_niov, libcfs_id2str(target));
1483
1484         LASSERT (payload_nob == 0 || payload_niov > 0);
1485         LASSERT (payload_niov <= LNET_MAX_IOV);
1486
1487         /* Thread context */
1488         LASSERT (!in_interrupt());
1489         /* payload is either all vaddrs or all pages */
1490         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1491
1492         switch (type) {
1493         default:
1494                 LBUG();
1495                 return (-EIO);
1496                 
1497         case LNET_MSG_ACK:
1498                 LASSERT (payload_nob == 0);
1499                 break;
1500
1501         case LNET_MSG_GET:
1502                 if (routing || target_is_router)
1503                         break;                  /* send IMMEDIATE */
1504                 
1505                 /* is the REPLY message too small for RDMA? */
1506                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1507                 if (nob <= IBNAL_MSG_SIZE)
1508                         break;                  /* send IMMEDIATE */
1509
1510                 tx = kibnal_get_idle_tx();
1511                 if (tx == NULL) {
1512                         CERROR("Can allocate txd for GET to %s: \n",
1513                                libcfs_nid2str(target.nid));
1514                         return -ENOMEM;
1515                 }
1516                 
1517                 ibmsg = tx->tx_msg;
1518                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1519                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1520
1521                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1522                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1523                                                  0,
1524                                                  lntmsg->msg_md->md_niov,
1525                                                  lntmsg->msg_md->md_iov.iov,
1526                                                  0, lntmsg->msg_md->md_length);
1527                 else
1528                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1529                                                   0,
1530                                                   lntmsg->msg_md->md_niov,
1531                                                   lntmsg->msg_md->md_iov.kiov,
1532                                                   0, lntmsg->msg_md->md_length);
1533                 if (rc != 0) {
1534                         CERROR("Can't setup GET sink for %s: %d\n",
1535                                libcfs_nid2str(target.nid), rc);
1536                         kibnal_tx_done(tx);
1537                         return -EIO;
1538                 }
1539
1540 #if IBNAL_USE_FMR
1541                 nob = sizeof(kib_get_msg_t);
1542 #else
1543                 {
1544                         int n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1545                         
1546                         nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1547                 }
1548 #endif
1549                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1550
1551                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni,
1552                                                          lntmsg);
1553                 if (tx->tx_lntmsg[1] == NULL) {
1554                         CERROR("Can't create reply for GET -> %s\n",
1555                                libcfs_nid2str(target.nid));
1556                         kibnal_tx_done(tx);
1557                         return -EIO;
1558                 }
1559
1560                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1561                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1562                 kibnal_launch_tx(tx, target.nid);
1563                 return 0;
1564
1565         case LNET_MSG_REPLY: 
1566         case LNET_MSG_PUT:
1567                 /* Is the payload small enough not to need RDMA? */
1568                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1569                 if (nob <= IBNAL_MSG_SIZE)
1570                         break;                  /* send IMMEDIATE */
1571
1572                 tx = kibnal_get_idle_tx();
1573                 if (tx == NULL) {
1574                         CERROR("Can't allocate %s txd for %s\n",
1575                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1576                                libcfs_nid2str(target.nid));
1577                         return -ENOMEM;
1578                 }
1579
1580                 if (payload_kiov == NULL)
1581                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1,
1582                                                  payload_niov, payload_iov,
1583                                                  payload_offset, payload_nob);
1584                 else
1585                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1,
1586                                                   payload_niov, payload_kiov,
1587                                                   payload_offset, payload_nob);
1588                 if (rc != 0) {
1589                         CERROR("Can't setup PUT src for %s: %d\n",
1590                                libcfs_nid2str(target.nid), rc);
1591                         kibnal_tx_done(tx);
1592                         return -EIO;
1593                 }
1594
1595                 ibmsg = tx->tx_msg;
1596                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1597                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1598                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1599
1600                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1601                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1602                 kibnal_launch_tx(tx, target.nid);
1603                 return 0;
1604         }
1605
1606         /* send IMMEDIATE */
1607
1608         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1609                  <= IBNAL_MSG_SIZE);
1610
1611         tx = kibnal_get_idle_tx();
1612         if (tx == NULL) {
1613                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1614                         type, libcfs_nid2str(target.nid));
1615                 return -ENOMEM;
1616         }
1617
1618         ibmsg = tx->tx_msg;
1619         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1620
1621         if (payload_kiov != NULL)
1622                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1623                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1624                                     payload_niov, payload_kiov,
1625                                     payload_offset, payload_nob);
1626         else
1627                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1628                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1629                                    payload_niov, payload_iov,
1630                                    payload_offset, payload_nob);
1631
1632         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1633         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1634
1635         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1636         kibnal_launch_tx(tx, target.nid);
1637         return 0;
1638 }
1639
1640 void
1641 kibnal_reply(lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1642 {
1643         lnet_process_id_t target = lntmsg->msg_target;
1644         unsigned int      niov = lntmsg->msg_niov; 
1645         struct iovec     *iov = lntmsg->msg_iov; 
1646         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1647         unsigned int      offset = lntmsg->msg_offset;
1648         unsigned int      nob = lntmsg->msg_len;
1649         kib_tx_t         *tx;
1650         int               rc;
1651         
1652         tx = kibnal_get_idle_tx();
1653         if (tx == NULL) {
1654                 CERROR("Can't get tx for REPLY to %s\n",
1655                        libcfs_nid2str(target.nid));
1656                 goto failed_0;
1657         }
1658
1659         if (nob == 0)
1660                 rc = 0;
1661         else if (kiov == NULL)
1662                 rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1, 
1663                                          niov, iov, offset, nob);
1664         else
1665                 rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1, 
1666                                           niov, kiov, offset, nob);
1667
1668         if (rc != 0) {
1669                 CERROR("Can't setup GET src for %s: %d\n",
1670                        libcfs_nid2str(target.nid), rc);
1671                 goto failed_1;
1672         }
1673         
1674         rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
1675                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1676                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1677         if (rc < 0) {
1678                 CERROR("Can't setup rdma for GET from %s: %d\n", 
1679                        libcfs_nid2str(target.nid), rc);
1680                 goto failed_1;
1681         }
1682         
1683         if (rc == 0) {
1684                 /* No RDMA: local completion may happen now! */
1685                 lnet_finalize(ni, lntmsg, 0);
1686         } else {
1687                 /* RDMA: lnet_finalize(lntmsg) when it
1688                  * completes */
1689                 tx->tx_lntmsg[0] = lntmsg;
1690         }
1691         
1692         kibnal_queue_tx(tx, rx->rx_conn);
1693         return;
1694         
1695  failed_1:
1696         kibnal_tx_done(tx);
1697  failed_0:
1698         lnet_finalize(ni, lntmsg, -EIO);
1699 }
1700
1701 int
1702 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1703                    void **new_private)
1704 {
1705         kib_rx_t    *rx = private;
1706         kib_conn_t  *conn = rx->rx_conn;
1707
1708         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1709                 /* Can't block if RDMA completions need normal credits */
1710                 LCONSOLE_ERROR(0x12d, "Dropping message from %s: no buffers free. "
1711                                "%s is running an old version of LNET that may "
1712                                "deadlock if messages wait for buffers)\n",
1713                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1714                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1715                 return -EDEADLK;
1716         }
1717         
1718         *new_private = private;
1719         return 0;
1720 }
1721
1722 int
1723 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1724              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1725              unsigned int offset, unsigned int mlen, unsigned int rlen)
1726 {
1727         kib_rx_t    *rx = private;
1728         kib_msg_t   *rxmsg = rx->rx_msg;
1729         kib_conn_t  *conn = rx->rx_conn;
1730         kib_tx_t    *tx;
1731         kib_msg_t   *txmsg;
1732         int          nob;
1733         int          post_cred = 1;
1734         int          rc = 0;
1735         
1736         LASSERT (mlen <= rlen);
1737         LASSERT (!in_interrupt());
1738         /* Either all pages or all vaddrs */
1739         LASSERT (!(kiov != NULL && iov != NULL));
1740
1741         switch (rxmsg->ibm_type) {
1742         default:
1743                 LBUG();
1744                 
1745         case IBNAL_MSG_IMMEDIATE:
1746                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1747                 if (nob > rx->rx_nob) {
1748                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1749                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1750                                 nob, rx->rx_nob);
1751                         rc = -EPROTO;
1752                         break;
1753                 }
1754
1755                 if (kiov != NULL)
1756                         lnet_copy_flat2kiov(niov, kiov, offset,
1757                                             IBNAL_MSG_SIZE, rxmsg,
1758                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1759                                             mlen);
1760                 else
1761                         lnet_copy_flat2iov(niov, iov, offset,
1762                                            IBNAL_MSG_SIZE, rxmsg,
1763                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1764                                            mlen);
1765                 lnet_finalize (ni, lntmsg, 0);
1766                 break;
1767
1768         case IBNAL_MSG_PUT_REQ:
1769                 if (mlen == 0) {
1770                         lnet_finalize(ni, lntmsg, 0);
1771                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
1772                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1773                         break;
1774                 }
1775                 
1776                 tx = kibnal_get_idle_tx();
1777                 if (tx == NULL) {
1778                         CERROR("Can't allocate tx for %s\n",
1779                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1780                         /* Not replying will break the connection */
1781                         rc = -ENOMEM;
1782                         break;
1783                 }
1784
1785                 txmsg = tx->tx_msg;
1786                 if (kiov == NULL)
1787                         rc = kibnal_setup_rd_iov(tx, 
1788                                                  &txmsg->ibm_u.putack.ibpam_rd,
1789                                                  0,
1790                                                  niov, iov, offset, mlen);
1791                 else
1792                         rc = kibnal_setup_rd_kiov(tx,
1793                                                   &txmsg->ibm_u.putack.ibpam_rd,
1794                                                   0,
1795                                                   niov, kiov, offset, mlen);
1796                 if (rc != 0) {
1797                         CERROR("Can't setup PUT sink for %s: %d\n",
1798                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1799                         kibnal_tx_done(tx);
1800                         /* tell peer it's over */
1801                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, rc,
1802                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1803                         break;
1804                 }
1805
1806                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1807                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1808 #if IBNAL_USE_FMR
1809                 nob = sizeof(kib_putack_msg_t);
1810 #else
1811                 {
1812                         int n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1813
1814                         nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1815                 }
1816 #endif
1817                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1818
1819                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1820                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1821                 kibnal_queue_tx(tx, conn);
1822
1823                 if (conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
1824                         post_cred = 0; /* peer still owns 'rx' for sending PUT_DONE */
1825                 break;
1826
1827         case IBNAL_MSG_GET_REQ:
1828                 if (lntmsg != NULL) {
1829                         /* Optimized GET; RDMA lntmsg's payload */
1830                         kibnal_reply(ni, rx, lntmsg);
1831                 } else {
1832                         /* GET didn't match anything */
1833                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, 
1834                                                -ENODATA,
1835                                                rxmsg->ibm_u.get.ibgm_cookie);
1836                 }
1837                 break;
1838         }
1839
1840         kibnal_post_rx(rx, post_cred, 0);
1841         return rc;
1842 }
1843
1844 int
1845 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1846 {
1847         long    pid = kernel_thread (fn, arg, 0);
1848
1849         if (pid < 0)
1850                 return ((int)pid);
1851
1852         atomic_inc (&kibnal_data.kib_nthreads);
1853         return (0);
1854 }
1855
1856 void
1857 kibnal_thread_fini (void)
1858 {
1859         atomic_dec (&kibnal_data.kib_nthreads);
1860 }
1861
1862 void
1863 kibnal_peer_alive (kib_peer_t *peer)
1864 {
1865         /* This is racy, but everyone's only writing cfs_time_current() */
1866         peer->ibp_last_alive = cfs_time_current();
1867         mb();
1868 }
1869
1870 void
1871 kibnal_peer_notify (kib_peer_t *peer)
1872 {
1873         time_t        last_alive = 0;
1874         int           error = 0;
1875         unsigned long flags;
1876         
1877         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1878
1879         if (list_empty(&peer->ibp_conns) &&
1880             peer->ibp_accepting == 0 &&
1881             peer->ibp_connecting == 0 &&
1882             peer->ibp_error != 0) {
1883                 error = peer->ibp_error;
1884                 peer->ibp_error = 0;
1885                 last_alive = cfs_time_current_sec() -
1886                              cfs_duration_sec(cfs_time_current() -
1887                                               peer->ibp_last_alive);
1888         }
1889         
1890         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1891         
1892         if (error != 0)
1893                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1894 }
1895
1896 void
1897 kibnal_schedule_conn (kib_conn_t *conn)
1898 {
1899         unsigned long flags;
1900
1901         kibnal_conn_addref(conn);               /* ++ref for connd */
1902         
1903         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1904
1905         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1906         wake_up (&kibnal_data.kib_connd_waitq);
1907                 
1908         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1909 }
1910
1911 void
1912 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1913 {
1914         /* This just does the immediate housekeeping to start shutdown of an
1915          * established connection.  'error' is zero for a normal shutdown.
1916          * Caller holds kib_global_lock exclusively in irq context */
1917         kib_peer_t       *peer = conn->ibc_peer;
1918         
1919         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1920
1921         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1922                 return; /* already being handled  */
1923         
1924         /* NB Can't take ibc_lock here (could be in IRQ context), without
1925          * risking deadlock, so access to ibc_{tx_queue,active_txs} is racey */
1926
1927         if (error == 0 &&
1928             list_empty(&conn->ibc_tx_queue) &&
1929             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1930             list_empty(&conn->ibc_tx_queue_nocred) &&
1931             list_empty(&conn->ibc_active_txs)) {
1932                 CDEBUG(D_NET, "closing conn to %s"
1933                        " rx# "LPD64" tx# "LPD64"\n", 
1934                        libcfs_nid2str(peer->ibp_nid),
1935                        conn->ibc_txseq, conn->ibc_rxseq);
1936         } else {
1937                 CDEBUG(D_NETERROR, "Closing conn to %s: error %d%s%s%s%s"
1938                        " rx# "LPD64" tx# "LPD64"\n",
1939                        libcfs_nid2str(peer->ibp_nid), error,
1940                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1941                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1942                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1943                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
1944                        conn->ibc_txseq, conn->ibc_rxseq);
1945 #if 0
1946                 /* can't skip down the queue without holding ibc_lock (see above) */
1947                 list_for_each(tmp, &conn->ibc_tx_queue) {
1948                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1949                         
1950                         CERROR("   queued tx type %x cookie "LPX64
1951                                " sending %d waiting %d ticks %ld/%d\n", 
1952                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1953                                tx->tx_sending, tx->tx_waiting,
1954                                (long)(tx->tx_deadline - jiffies), HZ);
1955                 }
1956
1957                 list_for_each(tmp, &conn->ibc_active_txs) {
1958                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1959                         
1960                         CERROR("   active tx type %x cookie "LPX64
1961                                " sending %d waiting %d ticks %ld/%d\n", 
1962                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1963                                tx->tx_sending, tx->tx_waiting,
1964                                (long)(tx->tx_deadline - jiffies), HZ);
1965                 }
1966 #endif
1967         }
1968
1969         list_del (&conn->ibc_list);
1970
1971         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1972                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1973                     kibnal_peer_active(peer))     /* still in peer table */
1974                         kibnal_unlink_peer_locked (peer);
1975
1976                 peer->ibp_error = error; /* set/clear error on last conn */
1977         }
1978
1979         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTING);
1980
1981         kibnal_schedule_conn(conn);
1982         kibnal_conn_decref(conn);               /* lose ibc_list's ref */
1983 }
1984
1985 void
1986 kibnal_close_conn (kib_conn_t *conn, int error)
1987 {
1988         unsigned long flags;
1989         
1990         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1991
1992         kibnal_close_conn_locked (conn, error);
1993         
1994         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1995 }
1996
1997 void
1998 kibnal_handle_early_rxs(kib_conn_t *conn)
1999 {
2000         unsigned long    flags;
2001         kib_rx_t        *rx;
2002
2003         LASSERT (!in_interrupt());
2004         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
2005         
2006         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2007         while (!list_empty(&conn->ibc_early_rxs)) {
2008                 rx = list_entry(conn->ibc_early_rxs.next,
2009                                 kib_rx_t, rx_list);
2010                 list_del(&rx->rx_list);
2011                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2012                 
2013                 kibnal_handle_rx(rx);
2014                 
2015                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2016         }
2017         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2018 }
2019
2020 void
2021 kibnal_abort_txs(kib_conn_t *conn, struct list_head *txs)
2022 {
2023         LIST_HEAD           (zombies); 
2024         struct list_head    *tmp;
2025         struct list_head    *nxt;
2026         kib_tx_t            *tx;
2027
2028         spin_lock(&conn->ibc_lock);
2029
2030         list_for_each_safe (tmp, nxt, txs) {
2031                 tx = list_entry (tmp, kib_tx_t, tx_list);
2032
2033                 if (txs == &conn->ibc_active_txs) {
2034                         LASSERT (!tx->tx_queued);
2035                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2036                 } else {
2037                         LASSERT (tx->tx_queued);
2038                 }
2039                 
2040                 tx->tx_status = -ECONNABORTED;
2041                 tx->tx_queued = 0;
2042                 tx->tx_waiting = 0;
2043                 
2044                 if (tx->tx_sending == 0) {
2045                         list_del (&tx->tx_list);
2046                         list_add (&tx->tx_list, &zombies);
2047                 }
2048         }
2049
2050         spin_unlock(&conn->ibc_lock);
2051
2052         kibnal_txlist_done(&zombies, -ECONNABORTED);
2053 }
2054
2055 void
2056 kibnal_conn_disconnected(kib_conn_t *conn)
2057 {
2058         static IB_QP_ATTRIBUTES_MODIFY qpam = {.RequestState = QPStateError};
2059
2060         FSTATUS           frc;
2061
2062         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT_QP);
2063
2064         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
2065
2066         /* move QP to error state to make posted work items complete */
2067         frc = iba_modify_qp(conn->ibc_qp, &qpam, NULL);
2068         if (frc != FSUCCESS)
2069                 CERROR("can't move qp state to error: %d\n", frc);
2070
2071         /* Complete all tx descs not waiting for sends to complete.
2072          * NB we should be safe from RDMA now that the QP has changed state */
2073
2074         kibnal_abort_txs(conn, &conn->ibc_tx_queue);
2075         kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2076         kibnal_abort_txs(conn, &conn->ibc_tx_queue);
2077         kibnal_abort_txs(conn, &conn->ibc_active_txs);
2078
2079         kibnal_handle_early_rxs(conn);
2080 }
2081
2082 void
2083 kibnal_peer_connect_failed (kib_peer_t *peer, int type, int error)
2084 {
2085         LIST_HEAD        (zombies);
2086         unsigned long     flags;
2087
2088         LASSERT (error != 0);
2089         LASSERT (!in_interrupt());
2090
2091         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2092
2093         LASSERT (kibnal_peer_connecting(peer));
2094
2095         switch (type) {
2096         case IBNAL_CONN_ACTIVE:
2097                 LASSERT (peer->ibp_connecting > 0);
2098                 peer->ibp_connecting--;
2099                 break;
2100                 
2101         case IBNAL_CONN_PASSIVE:
2102                 LASSERT (peer->ibp_accepting > 0);
2103                 peer->ibp_accepting--;
2104                 break;
2105                 
2106         case IBNAL_CONN_WAITING:
2107                 /* Can't assert; I might be racing with a successful connection
2108                  * which clears passivewait */
2109                 peer->ibp_passivewait = 0;
2110                 break;
2111         default:
2112                 LBUG();
2113         }
2114
2115         if (kibnal_peer_connecting(peer) ||     /* another attempt underway */
2116             !list_empty(&peer->ibp_conns)) {    /* got connected */
2117                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2118                 return;
2119         }
2120
2121         /* Say when active connection can be re-attempted */
2122         peer->ibp_reconnect_interval *= 2;
2123         peer->ibp_reconnect_interval =
2124                 MAX(peer->ibp_reconnect_interval,
2125                     *kibnal_tunables.kib_min_reconnect_interval);
2126         peer->ibp_reconnect_interval =
2127                 MIN(peer->ibp_reconnect_interval,
2128                     *kibnal_tunables.kib_max_reconnect_interval);
2129         
2130         peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval * HZ;
2131
2132         /* Take peer's blocked transmits to complete with error */
2133         list_add(&zombies, &peer->ibp_tx_queue);
2134         list_del_init(&peer->ibp_tx_queue);
2135                 
2136         if (kibnal_peer_active(peer) &&
2137             peer->ibp_persistence == 0) {
2138                 /* failed connection attempt on non-persistent peer */
2139                 kibnal_unlink_peer_locked (peer);
2140         }
2141
2142         peer->ibp_error = error;
2143         
2144         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2145
2146         kibnal_peer_notify(peer);
2147
2148         if (list_empty (&zombies))
2149                 return;
2150         
2151         CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
2152                 libcfs_nid2str(peer->ibp_nid));
2153
2154         kibnal_txlist_done (&zombies, -EHOSTUNREACH);
2155 }
2156
2157 void
2158 kibnal_connreq_done (kib_conn_t *conn, int type, int status)
2159 {
2160         kib_peer_t       *peer = conn->ibc_peer;
2161         struct list_head  txs;
2162         kib_tx_t         *tx;
2163         unsigned long     flags;
2164
2165         LASSERT (!in_interrupt());
2166         LASSERT (type == IBNAL_CONN_ACTIVE || type == IBNAL_CONN_PASSIVE);
2167         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT_QP);
2168         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
2169         LASSERT (kibnal_peer_connecting(peer));
2170
2171         LIBCFS_FREE(conn->ibc_cvars, sizeof(*conn->ibc_cvars));
2172         conn->ibc_cvars = NULL;
2173
2174         if (status != 0) {
2175                 /* failed to establish connection */
2176                 kibnal_peer_connect_failed(conn->ibc_peer, type, status);
2177                 kibnal_conn_disconnected(conn);
2178                 kibnal_conn_decref(conn);       /* Lose CM's ref */
2179                 return;
2180         }
2181
2182         /* connection established */
2183         LASSERT(conn->ibc_state == IBNAL_CONN_CONNECTING);
2184
2185         conn->ibc_last_send = jiffies;
2186         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2187         kibnal_peer_alive(peer);
2188
2189         CDEBUG(D_NET, "Connection %s ESTABLISHED\n",
2190                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2191
2192         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2193
2194         peer->ibp_passivewait = 0;              /* not waiting (got conn now) */
2195         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2196         list_add_tail(&conn->ibc_list, &peer->ibp_conns);
2197         
2198         if (!kibnal_peer_active(peer)) {
2199                 /* peer has been deleted */
2200                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2201                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2202
2203                 kibnal_peer_connect_failed(conn->ibc_peer, type, -ECONNABORTED);
2204                 kibnal_conn_decref(conn);       /* lose CM's ref */
2205                 return;
2206         }
2207         
2208         switch (type) {
2209         case IBNAL_CONN_ACTIVE:
2210                 LASSERT (peer->ibp_connecting > 0);
2211                 peer->ibp_connecting--;
2212                 break;
2213
2214         case IBNAL_CONN_PASSIVE:
2215                 LASSERT (peer->ibp_accepting > 0);
2216                 peer->ibp_accepting--;
2217                 break;
2218         default:
2219                 LBUG();
2220         }
2221         
2222         peer->ibp_reconnect_interval = 0;       /* OK to reconnect at any time */
2223
2224         /* Nuke any dangling conns from a different peer instance... */
2225         kibnal_close_stale_conns_locked(peer, conn->ibc_incarnation);
2226
2227         /* grab txs blocking for a conn */
2228         list_add(&txs, &peer->ibp_tx_queue);
2229         list_del_init(&peer->ibp_tx_queue);
2230
2231         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2232         
2233         /* Schedule blocked txs */
2234         spin_lock (&conn->ibc_lock);
2235         while (!list_empty (&txs)) {
2236                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2237                 list_del (&tx->tx_list);
2238
2239                 kibnal_queue_tx_locked (tx, conn);
2240         }
2241         spin_unlock (&conn->ibc_lock);
2242         kibnal_check_sends (conn);
2243 }
2244
2245 void
2246 kibnal_reject (lnet_nid_t nid, IB_HANDLE cep, int why)
2247 {
2248         static CM_REJECT_INFO  msgs[3];
2249         CM_REJECT_INFO        *msg = &msgs[why];
2250         FSTATUS                frc;
2251
2252         LASSERT (why >= 0 && why < sizeof(msgs)/sizeof(msgs[0]));
2253
2254         /* If I wasn't so lazy, I'd initialise this only once; it's effectively
2255          * read-only... */
2256         msg->Reason         = RC_USER_REJ;
2257         msg->PrivateData[0] = (IBNAL_MSG_MAGIC) & 0xff;
2258         msg->PrivateData[1] = (IBNAL_MSG_MAGIC >> 8) & 0xff;
2259         msg->PrivateData[2] = (IBNAL_MSG_MAGIC >> 16) & 0xff;
2260         msg->PrivateData[3] = (IBNAL_MSG_MAGIC >> 24) & 0xff;
2261         msg->PrivateData[4] = (IBNAL_MSG_VERSION) & 0xff;
2262         msg->PrivateData[5] = (IBNAL_MSG_VERSION >> 8) & 0xff;
2263         msg->PrivateData[6] = why;
2264
2265         frc = iba_cm_reject(cep, msg);
2266         if (frc != FSUCCESS)
2267                 CERROR("Error %d rejecting %s\n", frc, libcfs_nid2str(nid));
2268 }
2269
2270 void
2271 kibnal_check_connreject(kib_conn_t *conn, int type, CM_REJECT_INFO *rej)
2272 {
2273         kib_peer_t    *peer = conn->ibc_peer;
2274         unsigned long  flags;
2275         int            magic;
2276         int            version;
2277         int            why;
2278
2279         LASSERT (type == IBNAL_CONN_ACTIVE ||
2280                  type == IBNAL_CONN_PASSIVE);
2281
2282         CDEBUG(D_NET, "%s connection with %s rejected: %d\n",
2283                (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2284                libcfs_nid2str(peer->ibp_nid), rej->Reason);
2285
2286         switch (rej->Reason) {
2287         case RC_STALE_CONN:
2288                 if (type == IBNAL_CONN_PASSIVE) {
2289                         CERROR("Connection to %s rejected (stale QP)\n",
2290                                libcfs_nid2str(peer->ibp_nid));
2291                 } else {
2292                         CWARN("Connection from %s rejected (stale QP): "
2293                               "retrying...\n", libcfs_nid2str(peer->ibp_nid));
2294
2295                         /* retry from scratch to allocate a new conn 
2296                          * which will use a different QP */
2297                         kibnal_schedule_active_connect(peer, peer->ibp_version);
2298                 }
2299
2300                 /* An FCM_DISCONNECTED callback is still outstanding: give it a
2301                  * ref since kibnal_connreq_done() drops the CM's ref on conn
2302                  * on failure */
2303                 kibnal_conn_addref(conn);
2304                 break;
2305
2306         case RC_USER_REJ:
2307                 magic   = (rej->PrivateData[0]) |
2308                           (rej->PrivateData[1] << 8) |
2309                           (rej->PrivateData[2] << 16) |
2310                           (rej->PrivateData[3] << 24);
2311                 version = (rej->PrivateData[4]) |
2312                           (rej->PrivateData[5] << 8);
2313                 why     = (rej->PrivateData[6]);
2314
2315                 /* retry with old proto version */
2316                 if (magic == IBNAL_MSG_MAGIC &&
2317                     version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
2318                     conn->ibc_version == IBNAL_MSG_VERSION &&
2319                     type != IBNAL_CONN_PASSIVE) {
2320                         /* retry with a new conn */
2321                         CWARN ("Connection to %s refused: "
2322                                "retrying with old protocol version 0x%x\n", 
2323                                libcfs_nid2str(peer->ibp_nid), version);
2324                         kibnal_schedule_active_connect(peer, version);
2325                         break;
2326                 }
2327
2328                 if (magic != IBNAL_MSG_MAGIC ||
2329                     version != IBNAL_MSG_VERSION) {
2330                         CERROR("%s connection with %s rejected "
2331                                "(magic/ver %08x/%d why %d): "
2332                                "incompatible protocol\n",
2333                                (type == IBNAL_CONN_ACTIVE) ?
2334                                "Active" : "Passive",
2335                                libcfs_nid2str(peer->ibp_nid),
2336                                magic, version, why);
2337                         break;
2338                 }
2339
2340                 if (type == IBNAL_CONN_ACTIVE && 
2341                     why == IBNAL_REJECT_CONN_RACE) {
2342                         /* lost connection race */
2343                         CWARN("Connection to %s rejected: "
2344                               "lost connection race\n",
2345                               libcfs_nid2str(peer->ibp_nid));
2346
2347                         write_lock_irqsave(&kibnal_data.kib_global_lock, 
2348                                            flags);
2349
2350                         if (list_empty(&peer->ibp_conns)) {
2351                                 peer->ibp_passivewait = 1;
2352                                 peer->ibp_passivewait_deadline =
2353                                         jiffies + 
2354                                         (*kibnal_tunables.kib_timeout * HZ);
2355                         }
2356                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2357                                                 flags);
2358                         break;
2359                 }
2360
2361                 CERROR("%s connection with %s rejected: %d\n",
2362                        (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2363                        libcfs_nid2str(peer->ibp_nid), why);
2364                 break;
2365
2366         default:
2367                 CERROR("%s connection with %s rejected: %d\n",
2368                        (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2369                        libcfs_nid2str(peer->ibp_nid), rej->Reason);
2370         }
2371         
2372         kibnal_connreq_done(conn, type, -ECONNREFUSED);
2373 }
2374
2375 void
2376 kibnal_cm_disconnect_callback(kib_conn_t *conn, CM_CONN_INFO *info)
2377 {
2378         CDEBUG(D_NET, "%s: state %d, status 0x%x\n", 
2379                libcfs_nid2str(conn->ibc_peer->ibp_nid),
2380                conn->ibc_state, info->Status);
2381         
2382         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
2383
2384         switch (info->Status) {
2385         default:
2386                 LBUG();
2387                 break;
2388
2389         case FCM_DISCONNECT_REQUEST:
2390                 /* Schedule conn to iba_cm_disconnect() if it wasn't already */
2391                 kibnal_close_conn (conn, 0);
2392                 break;
2393
2394         case FCM_DISCONNECT_REPLY:              /* peer acks my disconnect req */
2395         case FCM_DISCONNECTED:                  /* end of TIME_WAIT */
2396                 CDEBUG(D_NET, "Connection %s disconnected.\n",
2397                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
2398                 kibnal_conn_decref(conn);       /* Lose CM's ref */
2399                 break;
2400         }
2401 }
2402
2403 void
2404 kibnal_cm_passive_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2405 {
2406         kib_conn_t       *conn = arg;
2407
2408         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2409
2410         /* Established Connection Notifier */
2411         switch (info->Status) {
2412         default:
2413                 CERROR("Unexpected status %d on Connection %s\n",
2414                        info->Status, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2415                 LBUG();
2416                 break;
2417
2418         case FCM_CONNECT_TIMEOUT:
2419                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ETIMEDOUT);
2420                 break;
2421                 
2422         case FCM_CONNECT_REJECT:
2423                 kibnal_check_connreject(conn, IBNAL_CONN_PASSIVE, 
2424                                         &info->Info.Reject);
2425                 break;
2426
2427         case FCM_CONNECT_ESTABLISHED:
2428                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, 0);
2429                 break;
2430
2431         case FCM_DISCONNECT_REQUEST:
2432         case FCM_DISCONNECT_REPLY:
2433         case FCM_DISCONNECTED:
2434                 kibnal_cm_disconnect_callback(conn, info);
2435                 break;
2436         }
2437 }
2438
2439 int
2440 kibnal_accept (kib_conn_t **connp, IB_HANDLE cep, kib_msg_t *msg, int nob)
2441 {
2442         lnet_nid_t     nid;
2443         kib_conn_t    *conn;
2444         kib_peer_t    *peer;
2445         kib_peer_t    *peer2;
2446         unsigned long  flags;
2447         int            rc;
2448
2449         rc = kibnal_unpack_msg(msg, 0, nob);
2450         if (rc != 0) {
2451                 /* SILENT! kibnal_unpack_msg() complains if required */
2452                 kibnal_reject(LNET_NID_ANY, cep, IBNAL_REJECT_FATAL);
2453                 return -EPROTO;
2454         }
2455
2456         nid = msg->ibm_srcnid;
2457
2458         if (msg->ibm_version != IBNAL_MSG_VERSION)
2459                 CWARN("Connection from %s: old protocol version 0x%x\n",
2460                       libcfs_nid2str(nid), msg->ibm_version);
2461
2462         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
2463                 CERROR("Can't accept %s: bad request type %d (%d expected)\n",
2464                        libcfs_nid2str(nid), msg->ibm_type, IBNAL_MSG_CONNREQ);
2465                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2466                 return -EPROTO;
2467         }
2468         
2469         if (msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid) {
2470                 CERROR("Can't accept %s: bad dst NID %s (%s expected)\n",
2471                        libcfs_nid2str(nid), 
2472                        libcfs_nid2str(msg->ibm_dstnid), 
2473                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
2474                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2475                 return -EPROTO;
2476         }
2477         
2478         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE ||
2479             msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE ||
2480             msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2481                 CERROR("Reject %s: q %d sz %d frag %d, (%d %d %d expected)\n",
2482                        libcfs_nid2str(nid), 
2483                        msg->ibm_u.connparams.ibcp_queue_depth,
2484                        msg->ibm_u.connparams.ibcp_max_msg_size,
2485                        msg->ibm_u.connparams.ibcp_max_frags,
2486                        IBNAL_MSG_QUEUE_SIZE,
2487                        IBNAL_MSG_SIZE,
2488                        IBNAL_MAX_RDMA_FRAGS);
2489                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2490                 return -EPROTO;
2491         }
2492
2493         conn = kibnal_create_conn(nid, msg->ibm_version);
2494         if (conn == NULL) {
2495                 kibnal_reject(nid, cep, IBNAL_REJECT_NO_RESOURCES);
2496                 return -ENOMEM;
2497         }
2498         
2499         /* assume 'nid' is a new peer */
2500         rc = kibnal_create_peer(&peer, nid);
2501         if (rc != 0) {
2502                 kibnal_conn_decref(conn);
2503                 kibnal_reject(nid, cep, IBNAL_REJECT_NO_RESOURCES);
2504                 return -ENOMEM;
2505         }
2506         
2507         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
2508
2509         peer2 = kibnal_find_peer_locked(nid);
2510         if (peer2 == NULL) {
2511                 /* peer table takes my ref on peer */
2512                 list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(nid));
2513                 LASSERT (peer->ibp_connecting == 0);
2514         } else {
2515                 kibnal_peer_decref(peer);
2516                 peer = peer2;
2517
2518                 if (peer->ibp_connecting != 0 &&
2519                     peer->ibp_nid < kibnal_data.kib_ni->ni_nid) {
2520                         /* Resolve concurrent connection attempts in favour of
2521                          * the higher NID */
2522                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2523                                                 flags);
2524                         kibnal_conn_decref(conn);
2525                         kibnal_reject(nid, cep, IBNAL_REJECT_CONN_RACE);
2526                         return -EALREADY;
2527                 }
2528         }
2529
2530         kibnal_peer_addref(peer); /* +1 ref for conn */
2531         peer->ibp_accepting++;
2532
2533         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2534         conn->ibc_peer = peer;
2535         conn->ibc_incarnation = msg->ibm_srcstamp;
2536         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2537         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2538         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2539                  <= IBNAL_RX_MSGS);
2540
2541         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2542
2543         *connp = conn;
2544         return 0;
2545 }
2546
2547 void
2548 kibnal_listen_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2549 {
2550
2551         CM_REQUEST_INFO  *req = &info->Info.Request;
2552         CM_REPLY_INFO    *rep;
2553         kib_conn_t       *conn;
2554         FSTATUS           frc;
2555         int               rc;
2556         
2557         LASSERT(arg == NULL); /* no conn yet for passive */
2558
2559         CDEBUG(D_NET, "%x\n", info->Status);
2560         
2561         if (info->Status == FCM_CONNECT_CANCEL) {
2562                 up(&kibnal_data.kib_listener_signal);
2563                 return;
2564         }
2565         
2566         LASSERT (info->Status == FCM_CONNECT_REQUEST);
2567
2568         rc = kibnal_accept(&conn, cep, (kib_msg_t *)req->PrivateData, 
2569                            CM_REQUEST_INFO_USER_LEN);
2570         if (rc != 0)                   /* kibnal_accept has rejected */
2571                 return;
2572
2573         conn->ibc_cvars->cv_path = req->PathInfo.Path;
2574         
2575         rc = kibnal_conn_rts(conn, 
2576                              req->CEPInfo.QPN, 
2577                              req->CEPInfo.OfferedInitiatorDepth,
2578                              req->CEPInfo.OfferedResponderResources,
2579                              req->CEPInfo.StartingPSN);
2580         if (rc != 0) {
2581                 kibnal_reject(conn->ibc_peer->ibp_nid, cep, 
2582                               IBNAL_REJECT_NO_RESOURCES);
2583                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ECONNABORTED);
2584                 return;
2585         }
2586
2587         memset(&conn->ibc_cvars->cv_cmci, 0, sizeof(conn->ibc_cvars->cv_cmci));
2588         rep = &conn->ibc_cvars->cv_cmci.Info.Reply;
2589
2590         rep->QPN                   = conn->ibc_cvars->cv_qpattrs.QPNumber;
2591         rep->QKey                  = conn->ibc_cvars->cv_qpattrs.Qkey;
2592         rep->StartingPSN           = conn->ibc_cvars->cv_qpattrs.RecvPSN;
2593         rep->EndToEndFlowControl   = conn->ibc_cvars->cv_qpattrs.FlowControl;
2594         rep->ArbInitiatorDepth     = conn->ibc_cvars->cv_qpattrs.InitiatorDepth;
2595         rep->ArbResponderResources = conn->ibc_cvars->cv_qpattrs.ResponderResources;
2596         rep->TargetAckDelay        = kibnal_data.kib_hca_attrs.LocalCaAckDelay;
2597         rep->FailoverAccepted      = IBNAL_FAILOVER_ACCEPTED;
2598         rep->RnRRetryCount         = req->CEPInfo.RnrRetryCount;
2599         
2600         CLASSERT (CM_REPLY_INFO_USER_LEN >=
2601                   offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
2602
2603         kibnal_pack_connmsg((kib_msg_t *)rep->PrivateData,
2604                             conn->ibc_version,
2605                             CM_REPLY_INFO_USER_LEN,
2606                             IBNAL_MSG_CONNACK,
2607                             conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
2608
2609         LASSERT (conn->ibc_cep == NULL);
2610         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2611
2612         frc = iba_cm_accept(cep, 
2613                             &conn->ibc_cvars->cv_cmci,
2614                             NULL,
2615                             kibnal_cm_passive_callback, conn, 
2616                             &conn->ibc_cep);
2617
2618         if (frc == FSUCCESS || frc == FPENDING)
2619                 return;
2620         
2621         CERROR("iba_cm_accept(%s) failed: %d\n", 
2622                libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2623         kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ECONNABORTED);
2624 }
2625
2626 void
2627 kibnal_check_connreply(kib_conn_t *conn, CM_REPLY_INFO *rep)
2628 {
2629         kib_msg_t   *msg = (kib_msg_t *)rep->PrivateData;
2630         lnet_nid_t   nid = conn->ibc_peer->ibp_nid;
2631         FSTATUS      frc;
2632         int          rc;
2633
2634         rc = kibnal_unpack_msg(msg, conn->ibc_version, CM_REPLY_INFO_USER_LEN);
2635         if (rc != 0) {
2636                 CERROR ("Error %d unpacking connack from %s\n",
2637                         rc, libcfs_nid2str(nid));
2638                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2639                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2640                 return;
2641         }
2642                         
2643         if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2644                 CERROR("Bad connack request type %d (%d expected) from %s\n",
2645                        msg->ibm_type, IBNAL_MSG_CONNREQ,
2646                        libcfs_nid2str(msg->ibm_srcnid));
2647                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2648                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2649                 return;
2650         }
2651
2652         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
2653             msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid ||
2654             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2655                 CERROR("Stale connack from %s(%s): %s(%s), "LPX64"("LPX64")\n",
2656                        libcfs_nid2str(msg->ibm_srcnid), 
2657                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
2658                        libcfs_nid2str(msg->ibm_dstnid),
2659                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid),
2660                        msg->ibm_dststamp, kibnal_data.kib_incarnation);
2661                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2662                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ESTALE);
2663                 return;
2664         }
2665         
2666         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE ||
2667             msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE ||
2668             msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2669                 CERROR("Reject %s: q %d sz %d frag %d, (%d %d %d expected)\n",
2670                        libcfs_nid2str(msg->ibm_srcnid), 
2671                        msg->ibm_u.connparams.ibcp_queue_depth,
2672                        msg->ibm_u.connparams.ibcp_max_msg_size,
2673                        msg->ibm_u.connparams.ibcp_max_frags,
2674                        IBNAL_MSG_QUEUE_SIZE,
2675                        IBNAL_MSG_SIZE,
2676                        IBNAL_MAX_RDMA_FRAGS);
2677                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2678                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2679                 return;
2680         }
2681                         
2682         CDEBUG(D_NET, "Connection %s REP_RECEIVED.\n",
2683                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2684
2685         conn->ibc_incarnation = msg->ibm_srcstamp;
2686         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2687         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2688         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2689                  <= IBNAL_RX_MSGS);
2690
2691         rc = kibnal_conn_rts(conn, 
2692                              rep->QPN,
2693                              rep->ArbInitiatorDepth,
2694                              rep->ArbResponderResources,
2695                              rep->StartingPSN);
2696         if (rc != 0) {
2697                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_NO_RESOURCES);
2698                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EIO);
2699                 return;
2700         }
2701
2702         memset(&conn->ibc_cvars->cv_cmci, 0, sizeof(conn->ibc_cvars->cv_cmci));
2703         
2704         frc = iba_cm_accept(conn->ibc_cep, 
2705                             &conn->ibc_cvars->cv_cmci, 
2706                             NULL, NULL, NULL, NULL);
2707
2708         if (frc == FCM_CONNECT_ESTABLISHED) {
2709                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, 0);
2710                 return;
2711         }
2712         
2713         CERROR("Connection %s CMAccept failed: %d\n",
2714                libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2715         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ECONNABORTED);
2716 }
2717
2718 void
2719 kibnal_cm_active_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2720 {
2721         kib_conn_t       *conn = arg;
2722
2723         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2724
2725         switch (info->Status) {
2726         default:
2727                 CERROR("unknown status %d on Connection %s\n", 
2728                        info->Status, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2729                 LBUG();
2730                 break;
2731
2732         case FCM_CONNECT_TIMEOUT:
2733                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ETIMEDOUT);
2734                 break;
2735                 
2736         case FCM_CONNECT_REJECT:
2737                 kibnal_check_connreject(conn, IBNAL_CONN_ACTIVE,
2738                                         &info->Info.Reject);
2739                 break;
2740
2741         case FCM_CONNECT_REPLY:
2742                 kibnal_check_connreply(conn, &info->Info.Reply);
2743                 break;
2744
2745         case FCM_DISCONNECT_REQUEST:
2746         case FCM_DISCONNECT_REPLY:
2747         case FCM_DISCONNECTED:
2748                 kibnal_cm_disconnect_callback(conn, info);
2749                 break;
2750         }
2751 }
2752
2753 void
2754 dump_path_records(PATH_RESULTS *results)
2755 {
2756         IB_PATH_RECORD *path;
2757         int i;
2758
2759         for (i = 0; i < results->NumPathRecords; i++) {
2760                 path = &results->PathRecords[i];
2761                 CDEBUG(D_NET, "%d: sgid "LPX64":"LPX64" dgid "
2762                        LPX64":"LPX64" pkey %x\n",
2763                        i,
2764                        path->SGID.Type.Global.SubnetPrefix,
2765                        path->SGID.Type.Global.InterfaceID,
2766                        path->DGID.Type.Global.SubnetPrefix,
2767                        path->DGID.Type.Global.InterfaceID,
2768                        path->P_Key);
2769         }
2770 }
2771
2772 void
2773 kibnal_pathreq_callback (void *arg, QUERY *qry, 
2774                          QUERY_RESULT_VALUES *qrslt)
2775 {
2776         IB_CA_ATTRIBUTES  *ca_attr = &kibnal_data.kib_hca_attrs;
2777         kib_conn_t        *conn = arg;
2778         CM_REQUEST_INFO   *req = &conn->ibc_cvars->cv_cmci.Info.Request;
2779         PATH_RESULTS      *path = (PATH_RESULTS *)qrslt->QueryResult;
2780         FSTATUS            frc;
2781         
2782         if (qrslt->Status != FSUCCESS || 
2783             qrslt->ResultDataSize < sizeof(*path)) {
2784                 CDEBUG (D_NETERROR, "pathreq %s failed: status %d data size %d\n", 
2785                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
2786                         qrslt->Status, qrslt->ResultDataSize);
2787                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2788                 return;
2789         }
2790
2791         if (path->NumPathRecords < 1) {
2792                 CDEBUG (D_NETERROR, "pathreq %s failed: no path records\n",
2793                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
2794                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2795                 return;
2796         }
2797
2798         //dump_path_records(path);
2799         conn->ibc_cvars->cv_path = path->PathRecords[0];
2800
2801         LASSERT (conn->ibc_cep == NULL);
2802
2803         conn->ibc_cep = kibnal_create_cep(conn->ibc_peer->ibp_nid);
2804         if (conn->ibc_cep == NULL) {
2805                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ENOMEM);
2806                 return;
2807         }
2808
2809         memset(req, 0, sizeof(*req));
2810         req->SID                               = conn->ibc_cvars->cv_svcrec.RID.ServiceID;
2811         req->CEPInfo.CaGUID                    = kibnal_data.kib_hca_guids[kibnal_data.kib_hca_idx];
2812         req->CEPInfo.EndToEndFlowControl       = IBNAL_EE_FLOW;
2813         req->CEPInfo.PortGUID                  = conn->ibc_cvars->cv_path.SGID.Type.Global.InterfaceID;
2814         req->CEPInfo.RetryCount                = IBNAL_RETRY;
2815         req->CEPInfo.RnrRetryCount             = IBNAL_RNR_RETRY;
2816         req->CEPInfo.AckTimeout                = IBNAL_ACK_TIMEOUT;
2817         req->CEPInfo.StartingPSN               = IBNAL_STARTING_PSN;
2818         req->CEPInfo.QPN                       = conn->ibc_cvars->cv_qpattrs.QPNumber;
2819         req->CEPInfo.QKey                      = conn->ibc_cvars->cv_qpattrs.Qkey;
2820         req->CEPInfo.OfferedResponderResources = ca_attr->MaxQPResponderResources;
2821         req->CEPInfo.OfferedInitiatorDepth     = ca_attr->MaxQPInitiatorDepth;
2822         req->PathInfo.bSubnetLocal             = IBNAL_LOCAL_SUB;
2823         req->PathInfo.Path                     = conn->ibc_cvars->cv_path;
2824
2825         CLASSERT (CM_REQUEST_INFO_USER_LEN >=
2826                   offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
2827
2828         kibnal_pack_connmsg((kib_msg_t *)req->PrivateData, 
2829                             conn->ibc_version,
2830                             CM_REQUEST_INFO_USER_LEN,
2831                             IBNAL_MSG_CONNREQ, 
2832                             conn->ibc_peer->ibp_nid, 0);
2833
2834         if (the_lnet.ln_testprotocompat != 0) {
2835                 /* single-shot proto test */
2836                 LNET_LOCK();
2837                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
2838                         ((kib_msg_t *)req->PrivateData)->ibm_version++;
2839                         the_lnet.ln_testprotocompat &= ~1;
2840                 }
2841                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
2842                         ((kib_msg_t *)req->PrivateData)->ibm_magic =
2843                                 LNET_PROTO_MAGIC;
2844                         the_lnet.ln_testprotocompat &= ~2;
2845                 }
2846                 LNET_UNLOCK();
2847         }
2848
2849         /* Flag I'm getting involved with the CM... */
2850         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2851
2852         /* cm callback gets my conn ref */
2853         frc = iba_cm_connect(conn->ibc_cep, req, 
2854                              kibnal_cm_active_callback, conn);
2855         if (frc == FPENDING || frc == FSUCCESS)
2856                 return;
2857         
2858         CERROR ("Connect %s failed: %d\n", 
2859                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2860         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2861 }
2862
2863 void
2864 kibnal_dump_service_records(SERVICE_RECORD_RESULTS *results)
2865 {
2866         IB_SERVICE_RECORD *svc;
2867         int i;
2868
2869         for (i = 0; i < results->NumServiceRecords; i++) {
2870                 svc = &results->ServiceRecords[i];
2871                 CDEBUG(D_NET, "%d: sid "LPX64" gid "LPX64":"LPX64" pkey %x\n",
2872                        i,
2873                        svc->RID.ServiceID,
2874                        svc->RID.ServiceGID.Type.Global.SubnetPrefix,
2875                        svc->RID.ServiceGID.Type.Global.InterfaceID,
2876                        svc->RID.ServiceP_Key);
2877         }
2878 }
2879
2880 void
2881 kibnal_service_get_callback (void *arg, QUERY *qry, 
2882                              QUERY_RESULT_VALUES *qrslt)
2883 {
2884         kib_conn_t              *conn = arg;
2885         SERVICE_RECORD_RESULTS  *svc;
2886         FSTATUS                  frc;
2887
2888         if (qrslt->Status != FSUCCESS || 
2889             qrslt->ResultDataSize < sizeof(*svc)) {
2890                 CDEBUG (D_NETERROR, "Lookup %s failed: status %d data size %d\n", 
2891                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
2892                         qrslt->Status, qrslt->ResultDataSize);
2893                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2894                 return;
2895         }
2896
2897         svc = (SERVICE_RECORD_RESULTS *)qrslt->QueryResult;
2898         if (svc->NumServiceRecords < 1) {
2899                 CDEBUG (D_NETERROR, "lookup %s failed: no service records\n",
2900                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
2901                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2902                 return;
2903         }
2904
2905         //kibnal_dump_service_records(svc);
2906         conn->ibc_cvars->cv_svcrec = svc->ServiceRecords[0];
2907
2908         qry = &conn->ibc_cvars->cv_query;
2909         memset(qry, 0, sizeof(*qry));
2910
2911         qry->OutputType = OutputTypePathRecord;
2912         qry->InputType = InputTypePortGuidPair;
2913
2914         qry->InputValue.PortGuidPair.SourcePortGuid = 
2915                 kibnal_data.kib_port_guid;
2916         qry->InputValue.PortGuidPair.DestPortGuid  = 
2917                 conn->ibc_cvars->cv_svcrec.RID.ServiceGID.Type.Global.InterfaceID;
2918
2919         /* kibnal_pathreq_callback gets my conn ref */
2920         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd,
2921                                             kibnal_data.kib_port_guid,
2922                                             qry, 
2923                                             kibnal_pathreq_callback,
2924                                             &kibnal_data.kib_sdretry,
2925                                             conn);
2926         if (frc == FPENDING)
2927                 return;
2928
2929         CERROR ("pathreq %s failed: %d\n", 
2930                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2931         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2932 }
2933
2934 void
2935 kibnal_connect_peer (kib_peer_t *peer)
2936 {
2937         QUERY                     *qry;
2938         FSTATUS                    frc;
2939         kib_conn_t                *conn;
2940
2941         LASSERT (peer->ibp_connecting != 0);
2942
2943         conn = kibnal_create_conn(peer->ibp_nid, peer->ibp_version);
2944         if (conn == NULL) {
2945                 CERROR ("Can't allocate conn\n");
2946                 kibnal_peer_connect_failed(peer, IBNAL_CONN_ACTIVE, -ENOMEM);
2947                 return;
2948         }
2949
2950         conn->ibc_peer = peer;
2951         kibnal_peer_addref(peer);
2952
2953         qry = &conn->ibc_cvars->cv_query;
2954         memset(qry, 0, sizeof(*qry));
2955
2956         qry->OutputType = OutputTypeServiceRecord;
2957         qry->InputType = InputTypeServiceRecord;
2958
2959         qry->InputValue.ServiceRecordValue.ComponentMask = 
2960                 KIBNAL_SERVICE_KEY_MASK;
2961         kibnal_set_service_keys(
2962                 &qry->InputValue.ServiceRecordValue.ServiceRecord, 
2963                 peer->ibp_nid);
2964
2965         /* kibnal_service_get_callback gets my conn ref */
2966         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd,
2967                                             kibnal_data.kib_port_guid,
2968                                             qry,
2969                                             kibnal_service_get_callback,
2970                                             &kibnal_data.kib_sdretry, 
2971                                             conn);
2972         if (frc == FPENDING)
2973                 return;
2974
2975         CERROR("Lookup %s failed: %d\n", libcfs_nid2str(peer->ibp_nid), frc);
2976         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2977 }
2978
2979 int
2980 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
2981 {
2982         kib_tx_t          *tx;
2983         struct list_head  *ttmp;
2984         int                timed_out = 0;
2985
2986         spin_lock(&conn->ibc_lock);
2987
2988         list_for_each (ttmp, txs) {
2989                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2990
2991                 if (txs == &conn->ibc_active_txs) {
2992                         LASSERT (!tx->tx_queued);
2993                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2994                 } else {
2995                         LASSERT (tx->tx_queued);
2996                 }
2997
2998                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2999                         timed_out = 1;
3000                         break;
3001                 }
3002         }
3003
3004         spin_unlock(&conn->ibc_lock);
3005         return timed_out;
3006 }
3007
3008 int
3009 kibnal_conn_timed_out (kib_conn_t *conn)
3010 {
3011         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
3012                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
3013                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
3014                 kibnal_check_txs(conn, &conn->ibc_active_txs);
3015 }
3016
3017 void
3018 kibnal_check_peers (int idx)
3019 {
3020         rwlock_t          *rwlock = &kibnal_data.kib_global_lock;
3021         struct list_head  *peers = &kibnal_data.kib_peers[idx];
3022         struct list_head  *ptmp;
3023         kib_peer_t        *peer;
3024         kib_conn_t        *conn;
3025         struct list_head  *ctmp;
3026         unsigned long      flags;
3027
3028  again:
3029         /* NB. We expect to have a look at all the peers and not find any
3030          * rdmas to time out, so we just use a shared lock while we
3031          * take a look... */
3032         read_lock_irqsave(rwlock, flags);
3033
3034         list_for_each (ptmp, peers) {
3035                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
3036
3037                 if (peer->ibp_passivewait) {
3038                         LASSERT (list_empty(&peer->ibp_conns));
3039                         
3040                         if (!time_after_eq(jiffies, 
3041                                            peer->ibp_passivewait_deadline))
3042                                 continue;
3043                         
3044                         kibnal_peer_addref(peer); /* ++ ref for me... */
3045                         read_unlock_irqrestore(rwlock, flags);
3046
3047                         kibnal_peer_connect_failed(peer, IBNAL_CONN_WAITING,
3048                                                    -ETIMEDOUT);
3049                         kibnal_peer_decref(peer); /* ...until here */
3050                         
3051                         /* start again now I've dropped the lock */
3052                         goto again;
3053                 }
3054
3055                 list_for_each (ctmp, &peer->ibp_conns) {
3056                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
3057
3058                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
3059
3060                         /* In case we have enough credits to return via a
3061                          * NOOP, but there were no non-blocking tx descs
3062                          * free to do it last time... */
3063                         kibnal_check_sends(conn);
3064
3065                         if (!kibnal_conn_timed_out(conn))
3066                                 continue;
3067
3068                         /* Handle timeout by closing the whole connection.  We
3069                          * can only be sure RDMA activity has ceased once the
3070                          * QP has been modified. */
3071                         
3072                         kibnal_conn_addref(conn); /* 1 ref for me... */
3073
3074                         read_unlock_irqrestore(rwlock, flags);
3075
3076                         CERROR("Timed out RDMA with %s\n",
3077                                libcfs_nid2str(peer->ibp_nid));
3078
3079                         kibnal_close_conn (conn, -ETIMEDOUT);
3080                         kibnal_conn_decref(conn); /* ...until here */
3081
3082                         /* start again now I've dropped the lock */
3083                         goto again;
3084                 }
3085         }
3086
3087         read_unlock_irqrestore(rwlock, flags);
3088 }
3089
3090 void
3091 kibnal_disconnect_conn (kib_conn_t *conn)
3092 {
3093         FSTATUS       frc;
3094
3095         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECTING);
3096
3097         kibnal_conn_disconnected(conn);
3098                 
3099         frc = iba_cm_disconnect(conn->ibc_cep, NULL, NULL);
3100         switch (frc) {
3101         case FSUCCESS:
3102                 break;
3103                 
3104         case FINSUFFICIENT_RESOURCES:
3105                 CERROR("ENOMEM disconnecting %s\n",
3106                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3107                 /* This might cause the module to become unloadable since the
3108                  * FCM_DISCONNECTED callback is still outstanding */
3109                 break;
3110                 
3111         default:
3112                 CERROR("Unexpected error disconnecting %s: %d\n",
3113                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
3114                 LBUG();
3115         }
3116
3117         kibnal_peer_notify(conn->ibc_peer);
3118 }
3119
3120 int
3121 kibnal_connd (void *arg)
3122 {
3123         wait_queue_t       wait;
3124         unsigned long      flags;
3125         kib_conn_t        *conn;
3126         kib_peer_t        *peer;
3127         int                timeout;
3128         int                i;
3129         int                did_something;
3130         int                peer_index = 0;
3131         unsigned long      deadline = jiffies;
3132         
3133         cfs_daemonize ("kibnal_connd");
3134         cfs_block_allsigs ();
3135
3136         init_waitqueue_entry (&wait, current);
3137
3138         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3139
3140         while (!kibnal_data.kib_shutdown) {
3141                 did_something = 0;
3142
3143                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
3144                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
3145                                            kib_conn_t, ibc_list);
3146                         list_del (&conn->ibc_list);
3147                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3148                         did_something = 1;
3149
3150                         kibnal_destroy_conn(conn);
3151
3152                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3153                 }
3154
3155                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3156                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3157                                            kib_conn_t, ibc_list);
3158                         list_del (&conn->ibc_list);
3159                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3160                         did_something = 1;
3161
3162                         kibnal_disconnect_conn(conn);
3163                         kibnal_conn_decref(conn);
3164                         
3165                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3166                 }
3167
3168                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
3169                         peer = list_entry (kibnal_data.kib_connd_peers.next,
3170                                            kib_peer_t, ibp_connd_list);
3171                         
3172                         list_del_init (&peer->ibp_connd_list);
3173                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3174                         did_something = 1;
3175
3176                         kibnal_connect_peer (peer);
3177                         kibnal_peer_decref (peer);
3178
3179                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3180                 }
3181
3182                 /* careful with the jiffy wrap... */
3183                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
3184                         const int n = 4;
3185                         const int p = 1;
3186                         int       chunk = kibnal_data.kib_peer_hash_size;
3187                         
3188                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3189
3190                         /* Time to check for RDMA timeouts on a few more
3191                          * peers: I do checks every 'p' seconds on a
3192                          * proportion of the peer table and I need to check
3193                          * every connection 'n' times within a timeout
3194                          * interval, to ensure I detect a timeout on any
3195                          * connection within (n+1)/n times the timeout
3196                          * interval. */
3197
3198                         if (*kibnal_tunables.kib_timeout > n * p)
3199                                 chunk = (chunk * n * p) / 
3200                                         *kibnal_tunables.kib_timeout;
3201                         if (chunk == 0)
3202                                 chunk = 1;
3203
3204                         for (i = 0; i < chunk; i++) {
3205                                 kibnal_check_peers (peer_index);
3206                                 peer_index = (peer_index + 1) % 
3207                                              kibnal_data.kib_peer_hash_size;
3208                         }
3209
3210                         deadline += p * HZ;
3211                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3212                         did_something = 1;
3213                 }
3214
3215                 if (did_something)
3216                         continue;
3217
3218                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3219
3220                 set_current_state (TASK_INTERRUPTIBLE);
3221                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3222
3223                 if (!kibnal_data.kib_shutdown &&
3224                     list_empty (&kibnal_data.kib_connd_conns) &&
3225                     list_empty (&kibnal_data.kib_connd_peers))
3226                         schedule_timeout (timeout);
3227
3228                 set_current_state (TASK_RUNNING);
3229                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3230
3231                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3232         }
3233
3234         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3235
3236         kibnal_thread_fini ();
3237         return (0);
3238 }
3239
3240
3241 void 
3242 kibnal_hca_async_callback (void *hca_arg, IB_EVENT_RECORD *ev)
3243 {
3244         /* XXX flesh out.  this seems largely for async errors */
3245         CERROR("type: %d code: %u\n", ev->EventType, ev->EventCode);
3246 }
3247
3248 void
3249 kibnal_hca_callback (void *hca_arg, void *cq_arg)
3250 {
3251         unsigned long flags;
3252
3253         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3254         kibnal_data.kib_ready = 1;
3255         wake_up(&kibnal_data.kib_sched_waitq);
3256         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3257 }
3258
3259 int
3260 kibnal_scheduler(void *arg)
3261 {
3262         long               id = (long)arg;
3263         wait_queue_t       wait;
3264         char               name[16];
3265         FSTATUS            frc;
3266         FSTATUS            frc2;
3267         IB_WORK_COMPLETION wc;
3268         kib_rx_t          *rx;
3269         unsigned long      flags;
3270         __u64              rxseq = 0;
3271         int                busy_loops = 0;
3272
3273         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3274         cfs_daemonize(name);
3275         cfs_block_allsigs();
3276
3277         init_waitqueue_entry(&wait, current);
3278
3279         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3280
3281         while (!kibnal_data.kib_shutdown) {
3282                 if (busy_loops++ >= IBNAL_RESCHED) {
3283                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3284                                                flags);
3285
3286                         our_cond_resched();
3287                         busy_loops = 0;
3288                         
3289                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3290                 }
3291
3292                 if (kibnal_data.kib_ready &&
3293                     !kibnal_data.kib_checking_cq) {
3294                         /* take ownership of completion polling */
3295                         kibnal_data.kib_checking_cq = 1;
3296                         /* Assume I'll exhaust the CQ */
3297                         kibnal_data.kib_ready = 0;
3298                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3299                                                flags);
3300                         
3301                         frc = iba_poll_cq(kibnal_data.kib_cq, &wc);
3302                         if (frc == FNOT_DONE) {
3303                                 /* CQ empty */
3304                                 frc2 = iba_rearm_cq(kibnal_data.kib_cq,
3305                                                     CQEventSelNextWC);
3306                                 LASSERT (frc2 == FSUCCESS);
3307                         }
3308                         
3309                         if (frc == FSUCCESS &&
3310                             kibnal_wreqid2type(wc.WorkReqId) == IBNAL_WID_RX) {
3311                                 rx = (kib_rx_t *)kibnal_wreqid2ptr(wc.WorkReqId);
3312                                 
3313                                 /* Grab the RX sequence number NOW before
3314                                  * anyone else can get an RX completion */
3315                                 rxseq = rx->rx_conn->ibc_rxseq++;
3316                         }
3317                                 
3318                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3319                         /* give up ownership of completion polling */
3320                         kibnal_data.kib_checking_cq = 0;
3321
3322                         if (frc == FNOT_DONE)
3323                                 continue;
3324
3325                         LASSERT (frc == FSUCCESS);
3326                         /* Assume there's more: get another scheduler to check
3327                          * while I handle this completion... */
3328
3329                         kibnal_data.kib_ready = 1;
3330                         wake_up(&kibnal_data.kib_sched_waitq);
3331
3332                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3333                                                flags);
3334
3335                         switch (kibnal_wreqid2type(wc.WorkReqId)) {
3336                         case IBNAL_WID_RX:
3337                                 kibnal_rx_complete(&wc, rxseq);
3338                                 break;
3339                                 
3340                         case IBNAL_WID_TX:
3341                                 kibnal_tx_complete(&wc);
3342                                 break;
3343                                 
3344                         case IBNAL_WID_RDMA:
3345                                 /* We only get RDMA completion notification if
3346                                  * it fails.  So we just ignore them completely
3347                                  * because...
3348                                  *
3349                                  * 1) If an RDMA fails, all subsequent work
3350                                  * items, including the final SEND will fail
3351                                  * too, so I'm still guaranteed to notice that
3352                                  * this connection is hosed.
3353                                  *
3354                                  * 2) It's positively dangerous to look inside
3355                                  * the tx descriptor obtained from an RDMA work
3356                                  * item.  As soon as I drop the kib_sched_lock,
3357                                  * I give a scheduler on another CPU a chance
3358                                  * to get the final SEND completion, so the tx
3359                                  * descriptor can get freed as I inspect it. */
3360                                 CERROR ("RDMA failed: %d\n", wc.Status);
3361                                 break;
3362
3363                         default:
3364                                 LBUG();
3365                         }
3366                         
3367                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3368                         continue;
3369                 }
3370
3371                 /* Nothing to do; sleep... */
3372
3373                 set_current_state(TASK_INTERRUPTIBLE);
3374                 add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
3375                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3376                                        flags);
3377
3378                 schedule();
3379
3380                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3381                 set_current_state(TASK_RUNNING);
3382                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3383         }
3384
3385         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3386
3387         kibnal_thread_fini();
3388         return (0);
3389 }