Whamcloud - gitweb
3e675488b600cfc017783a7f058d8019fceb1701
[fs/lustre-release.git] / lnet / klnds / iiblnd / iiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "iiblnd.h"
25
26 void
27 hexdump(char *string, void *ptr, int len)
28 {
29         unsigned char *c = ptr;
30         int i;
31
32         return;
33
34         if (len < 0 || len > 2048)  {
35                 printk("XXX what the hell? %d\n",len);
36                 return;
37         }
38
39         printk("%d bytes of '%s' from 0x%p\n", len, string, ptr);
40
41         for (i = 0; i < len;) {
42                 printk("%02x",*(c++));
43                 i++;
44                 if (!(i & 15)) {
45                         printk("\n");
46                 } else if (!(i&1)) {
47                         printk(" ");
48                 }
49         }
50
51         if(len & 15) {
52                 printk("\n");
53         }
54 }
55
56 void
57 kibnal_tx_done (kib_tx_t *tx)
58 {
59         lnet_msg_t *lntmsg[2];
60         int         rc = tx->tx_status;
61         int         i;
62
63         LASSERT (!in_interrupt());
64         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
65         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
66         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
67
68 #if IBNAL_USE_FMR
69         /* Handle unmapping if required */
70 #endif
71         /* tx may have up to 2 lnet msgs to finalise */
72         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
73         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
74         
75         if (tx->tx_conn != NULL) {
76                 kibnal_conn_decref(tx->tx_conn);
77                 tx->tx_conn = NULL;
78         }
79
80         tx->tx_nwrq = 0;
81         tx->tx_status = 0;
82
83         spin_lock(&kibnal_data.kib_tx_lock);
84
85         list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
86
87         spin_unlock(&kibnal_data.kib_tx_lock);
88
89         /* delay finalize until my descs have been freed */
90         for (i = 0; i < 2; i++) {
91                 if (lntmsg[i] == NULL)
92                         continue;
93
94                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
95         }
96 }
97
98 kib_tx_t *
99 kibnal_get_idle_tx (void) 
100 {
101         kib_tx_t      *tx;
102         
103         spin_lock(&kibnal_data.kib_tx_lock);
104
105         if (list_empty (&kibnal_data.kib_idle_txs)) {
106                 spin_unlock(&kibnal_data.kib_tx_lock);
107                 return NULL;
108         }
109
110         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
111         list_del (&tx->tx_list);
112
113         /* Allocate a new completion cookie.  It might not be needed,
114          * but we've got a lock right now and we're unlikely to
115          * wrap... */
116         tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
117
118         spin_unlock(&kibnal_data.kib_tx_lock);
119
120         LASSERT (tx->tx_nwrq == 0);
121         LASSERT (!tx->tx_queued);
122         LASSERT (tx->tx_sending == 0);
123         LASSERT (!tx->tx_waiting);
124         LASSERT (tx->tx_status == 0);
125         LASSERT (tx->tx_conn == NULL);
126         LASSERT (tx->tx_lntmsg[0] == NULL);
127         LASSERT (tx->tx_lntmsg[1] == NULL);
128         
129         return tx;
130 }
131
132 int
133 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
134 {
135         kib_conn_t   *conn = rx->rx_conn;
136         int           rc = 0;
137         FSTATUS       frc;
138
139         LASSERT (!in_interrupt());
140         /* old peers don't reserve rxs for RDMA replies */
141         LASSERT (!rsrvd_credit ||
142                  conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
143         
144         rx->rx_gl = (IB_LOCAL_DATASEGMENT) {
145                 .Address = rx->rx_hca_msg,
146                 .Lkey    = kibnal_data.kib_whole_mem.md_lkey,
147                 .Length  = IBNAL_MSG_SIZE,
148         };
149
150         rx->rx_wrq = (IB_WORK_REQ2) {
151                 .Next          = NULL,
152                 .WorkReqId     = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
153                 .MessageLen    = IBNAL_MSG_SIZE,
154                 .DSList        = &rx->rx_gl,
155                 .DSListDepth   = 1,
156                 .Operation     = WROpRecv,
157         };
158
159         LASSERT (conn->ibc_state >= IBNAL_CONN_CONNECTING);
160         LASSERT (rx->rx_nob >= 0);              /* not posted */
161
162         CDEBUG(D_NET, "posting rx [%d %x "LPX64"]\n", 
163                rx->rx_wrq.DSList->Length,
164                rx->rx_wrq.DSList->Lkey,
165                rx->rx_wrq.DSList->Address);
166
167         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
168                 /* No more posts for this rx; so lose its ref */
169                 kibnal_conn_decref(conn);
170                 return 0;
171         }
172         
173         rx->rx_nob = -1;                        /* flag posted */
174         mb();
175
176         frc = iba_post_recv2(conn->ibc_qp, &rx->rx_wrq, NULL);
177         if (frc == FSUCCESS) {
178                 if (credit || rsrvd_credit) {
179                         spin_lock(&conn->ibc_lock);
180
181                         if (credit)
182                                 conn->ibc_outstanding_credits++;
183                         if (rsrvd_credit)
184                                 conn->ibc_reserved_credits++;
185
186                         spin_unlock(&conn->ibc_lock);
187
188                         kibnal_check_sends(conn);
189                 }
190                 return 0;
191         }
192         
193         CERROR ("post rx -> %s failed %d\n", 
194                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
195         rc = -EIO;
196         kibnal_close_conn(rx->rx_conn, rc);
197         /* No more posts for this rx; so lose its ref */
198         kibnal_conn_decref(conn);
199         return rc;
200 }
201
202 int
203 kibnal_post_receives (kib_conn_t *conn)
204 {
205         int    i;
206         int    rc;
207
208         LASSERT (conn->ibc_state == IBNAL_CONN_CONNECTING);
209
210         for (i = 0; i < IBNAL_RX_MSGS; i++) {
211                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
212                  * fails (i.e. actual failure or we're disconnecting) */
213                 kibnal_conn_addref(conn);
214                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
215                 if (rc != 0)
216                         return rc;
217         }
218
219         return 0;
220 }
221
222 kib_tx_t *
223 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
224 {
225         struct list_head   *tmp;
226         
227         list_for_each(tmp, &conn->ibc_active_txs) {
228                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
229                 
230                 LASSERT (!tx->tx_queued);
231                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
232
233                 if (tx->tx_cookie != cookie)
234                         continue;
235
236                 if (tx->tx_waiting &&
237                     tx->tx_msg->ibm_type == txtype)
238                         return tx;
239
240                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
241                       tx->tx_waiting ? "" : "NOT ",
242                       tx->tx_msg->ibm_type, txtype);
243         }
244         return NULL;
245 }
246
247 void
248 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
249 {
250         kib_tx_t    *tx;
251         int          idle;
252
253         spin_lock(&conn->ibc_lock);
254
255         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
256         if (tx == NULL) {
257                 spin_unlock(&conn->ibc_lock);
258
259                 CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
260                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
261                 kibnal_close_conn (conn, -EPROTO);
262                 return;
263         }
264
265         if (tx->tx_status == 0) {               /* success so far */
266                 if (status < 0) {               /* failed? */
267                         tx->tx_status = status;
268                 } else if (txtype == IBNAL_MSG_GET_REQ) {
269                         lnet_set_reply_msg_len(kibnal_data.kib_ni,
270                                                tx->tx_lntmsg[1], status);
271                 }
272         }
273         
274         tx->tx_waiting = 0;
275
276         idle = !tx->tx_queued && (tx->tx_sending == 0);
277         if (idle)
278                 list_del(&tx->tx_list);
279
280         spin_unlock(&conn->ibc_lock);
281         
282         if (idle)
283                 kibnal_tx_done(tx);
284 }
285
286 void
287 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
288 {
289         kib_tx_t    *tx = kibnal_get_idle_tx();
290         
291         if (tx == NULL) {
292                 CERROR("Can't get tx for completion %x for %s\n",
293                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
294                 return;
295         }
296         
297         tx->tx_msg->ibm_u.completion.ibcm_status = status;
298         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
299         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
300         
301         kibnal_queue_tx(tx, conn);
302 }
303
304 void
305 kibnal_handle_rx (kib_rx_t *rx)
306 {
307         kib_msg_t    *msg = rx->rx_msg;
308         kib_conn_t   *conn = rx->rx_conn;
309         int           credits = msg->ibm_credits;
310         kib_tx_t     *tx;
311         int           rc = 0;
312         int           repost = 1;
313         int           rsrvd_credit = 0;
314         int           rc2;
315
316         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
317
318         CDEBUG (D_NET, "Received %x[%d] from %s\n",
319                 msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));
320         
321         if (credits != 0) {
322                 /* Have I received credits that will let me send? */
323                 spin_lock(&conn->ibc_lock);
324                 conn->ibc_credits += credits;
325                 spin_unlock(&conn->ibc_lock);
326
327                 kibnal_check_sends(conn);
328         }
329
330         switch (msg->ibm_type) {
331         default:
332                 CERROR("Bad IBNAL message type %x from %s\n",
333                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
334                 rc = -EPROTO;
335                 break;
336
337         case IBNAL_MSG_NOOP:
338                 break;
339
340         case IBNAL_MSG_IMMEDIATE:
341                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
342                                 msg->ibm_srcnid, rx, 0);
343                 repost = rc < 0;                /* repost on error */
344                 break;
345                 
346         case IBNAL_MSG_PUT_REQ:
347                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
348                                 msg->ibm_srcnid, rx, 1);
349                 repost = rc < 0;                /* repost on error */
350                 break;
351
352         case IBNAL_MSG_PUT_NAK:
353                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
354
355                 CWARN ("PUT_NACK from %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid));
356                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
357                                          msg->ibm_u.completion.ibcm_status,
358                                          msg->ibm_u.completion.ibcm_cookie);
359                 break;
360
361         case IBNAL_MSG_PUT_ACK:
362                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
363
364                 spin_lock(&conn->ibc_lock);
365                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
366                                                    msg->ibm_u.putack.ibpam_src_cookie);
367                 if (tx != NULL)
368                         list_del(&tx->tx_list);
369                 spin_unlock(&conn->ibc_lock);
370
371                 if (tx == NULL) {
372                         CERROR("Unmatched PUT_ACK from %s\n",
373                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
374                         rc = -EPROTO;
375                         break;
376                 }
377
378                 LASSERT (tx->tx_waiting);
379                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
380                  * (a) I can overwrite tx_msg since my peer has received it!
381                  * (b) tx_waiting set tells tx_complete() it's not done. */
382
383                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
384
385                 rc2 = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
386                                        kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
387                                        &msg->ibm_u.putack.ibpam_rd,
388                                        msg->ibm_u.putack.ibpam_dst_cookie);
389                 if (rc2 < 0)
390                         CERROR("Can't setup rdma for PUT to %s: %d\n",
391                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
392
393                 spin_lock(&conn->ibc_lock);
394                 if (tx->tx_status == 0 && rc2 < 0)
395                         tx->tx_status = rc2;
396                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
397                 kibnal_queue_tx_locked(tx, conn);
398                 spin_unlock(&conn->ibc_lock);
399                 break;
400                 
401         case IBNAL_MSG_PUT_DONE:
402                 /* This buffer was pre-reserved by not returning the credit
403                  * when the PUT_REQ's buffer was reposted, so I just return it
404                  * now */
405                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
406                                          msg->ibm_u.completion.ibcm_status,
407                                          msg->ibm_u.completion.ibcm_cookie);
408                 break;
409
410         case IBNAL_MSG_GET_REQ:
411                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
412                                 msg->ibm_srcnid, rx, 1);
413                 repost = rc < 0;                /* repost on error */
414                 break;
415
416         case IBNAL_MSG_GET_DONE:
417                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
418
419                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
420                                          msg->ibm_u.completion.ibcm_status,
421                                          msg->ibm_u.completion.ibcm_cookie);
422                 break;
423         }
424
425         if (rc < 0)                             /* protocol error */
426                 kibnal_close_conn(conn, rc);
427
428         if (repost) {
429                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
430                         rsrvd_credit = 0;       /* peer isn't pre-reserving */
431
432                 kibnal_post_rx(rx, !rsrvd_credit, rsrvd_credit);
433         }
434 }
435
436 void
437 kibnal_rx_complete (IB_WORK_COMPLETION *wc, __u64 rxseq)
438 {
439         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
440         int           nob = wc->Length;
441         kib_msg_t    *msg = rx->rx_msg;
442         kib_conn_t   *conn = rx->rx_conn;
443         unsigned long flags;
444         int           rc;
445         int           err = -EIO;
446
447         LASSERT (rx->rx_nob < 0);               /* was posted */
448         rx->rx_nob = 0;                         /* isn't now */
449         mb();
450
451         /* receives complete with error in any case after we've started
452          * disconnecting */
453         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
454                 goto ignore;
455
456         if (wc->Status != WRStatusSuccess) {
457                 CERROR("Rx from %s failed: %d\n", 
458                        libcfs_nid2str(conn->ibc_peer->ibp_nid), wc->Status);
459                 goto failed;
460         }
461
462         rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
463         if (rc != 0) {
464                 CERROR ("Error %d unpacking rx from %s\n",
465                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
466                 goto failed;
467         }
468
469         rx->rx_nob = nob;                       /* Now I know nob > 0 */
470         mb();
471
472         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
473             msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid ||
474             msg->ibm_srcstamp != conn->ibc_incarnation ||
475             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
476                 CERROR ("Stale rx from %s\n",
477                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
478                 err = -ESTALE;
479                 goto failed;
480         }
481
482         if (msg->ibm_seq != rxseq) {
483                 CERROR ("Out-of-sequence rx from %s"
484                         ": got "LPD64" but expected "LPD64"\n",
485                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
486                         msg->ibm_seq, rxseq);
487                 goto failed;
488         }
489
490         /* set time last known alive */
491         kibnal_peer_alive(conn->ibc_peer);
492
493         /* racing with connection establishment/teardown! */
494
495         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
496                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
497                 /* must check holding global lock to eliminate race */
498                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
499                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
500                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
501                                                 flags);
502                         return;
503                 }
504                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
505                                         flags);
506         }
507         kibnal_handle_rx(rx);
508         return;
509         
510  failed:
511         kibnal_close_conn(conn, err);
512  ignore:
513         /* Don't re-post rx & drop its ref on conn */
514         kibnal_conn_decref(conn);
515 }
516
517 struct page *
518 kibnal_kvaddr_to_page (unsigned long vaddr)
519 {
520         struct page *page;
521
522         if (vaddr >= VMALLOC_START &&
523             vaddr < VMALLOC_END) {
524                 page = vmalloc_to_page ((void *)vaddr);
525                 LASSERT (page != NULL);
526                 return page;
527         }
528 #if CONFIG_HIGHMEM
529         if (vaddr >= PKMAP_BASE &&
530             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
531                 /* No highmem pages only used for bulk (kiov) I/O */
532                 CERROR("find page for address in highmem\n");
533                 LBUG();
534         }
535 #endif
536         page = virt_to_page (vaddr);
537         LASSERT (page != NULL);
538         return page;
539 }
540
541 #if !IBNAL_USE_FMR
542 int
543 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
544                      unsigned long page_offset, unsigned long len)
545 {
546         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
547
548         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
549                 CERROR ("Too many RDMA fragments\n");
550                 return -EMSGSIZE;
551         }
552
553         if (active) {
554                 if (rd->rd_nfrag == 0)
555                         rd->rd_key = kibnal_data.kib_whole_mem.md_lkey;
556         } else {
557                 if (rd->rd_nfrag == 0)
558                         rd->rd_key = kibnal_data.kib_whole_mem.md_rkey;
559         }
560
561         frag->rf_nob  = len;
562         frag->rf_addr = kibnal_data.kib_whole_mem.md_addr +
563                         lnet_page2phys(page) + page_offset;
564
565         CDEBUG(D_NET,"map key %x frag [%d]["LPX64" for %d]\n", 
566                rd->rd_key, rd->rd_nfrag, frag->rf_addr, frag->rf_nob);
567
568         rd->rd_nfrag++;
569         return 0;
570 }
571
572 int
573 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
574                     unsigned int niov, struct iovec *iov, int offset, int nob)
575                  
576 {
577         int           fragnob;
578         int           rc;
579         unsigned long vaddr;
580         struct page  *page;
581         int           page_offset;
582
583         LASSERT (nob > 0);
584         LASSERT (niov > 0);
585         LASSERT ((rd != tx->tx_rd) == !active);
586
587         while (offset >= iov->iov_len) {
588                 offset -= iov->iov_len;
589                 niov--;
590                 iov++;
591                 LASSERT (niov > 0);
592         }
593
594         rd->rd_nfrag = 0;
595         do {
596                 LASSERT (niov > 0);
597
598                 vaddr = ((unsigned long)iov->iov_base) + offset;
599                 page_offset = vaddr & (PAGE_SIZE - 1);
600                 page = kibnal_kvaddr_to_page(vaddr);
601                 if (page == NULL) {
602                         CERROR ("Can't find page\n");
603                         return -EFAULT;
604                 }
605
606                 fragnob = min((int)(iov->iov_len - offset), nob);
607                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
608
609                 rc = kibnal_append_rdfrag(rd, active, page, 
610                                           page_offset, fragnob);
611                 if (rc != 0)
612                         return rc;
613
614                 if (offset + fragnob < iov->iov_len) {
615                         offset += fragnob;
616                 } else {
617                         offset = 0;
618                         iov++;
619                         niov--;
620                 }
621                 nob -= fragnob;
622         } while (nob > 0);
623         
624         return 0;
625 }
626
627 int
628 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
629                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
630 {
631         int            fragnob;
632         int            rc;
633
634         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
635
636         LASSERT (nob > 0);
637         LASSERT (nkiov > 0);
638         LASSERT ((rd != tx->tx_rd) == !active);
639
640         while (offset >= kiov->kiov_len) {
641                 offset -= kiov->kiov_len;
642                 nkiov--;
643                 kiov++;
644                 LASSERT (nkiov > 0);
645         }
646
647         rd->rd_nfrag = 0;
648         do {
649                 LASSERT (nkiov > 0);
650                 fragnob = min((int)(kiov->kiov_len - offset), nob);
651                 
652                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
653                                           kiov->kiov_offset + offset,
654                                           fragnob);
655                 if (rc != 0)
656                         return rc;
657
658                 offset = 0;
659                 kiov++;
660                 nkiov--;
661                 nob -= fragnob;
662         } while (nob > 0);
663
664         return 0;
665 }
666 #else
667 int
668 kibnal_map_tx (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
669                int npages, unsigned long page_offset, int nob)
670 {
671         IB_ACCESS_CONTROL access = {0,};
672         FSTATUS           frc;
673
674         LASSERT ((rd != tx->tx_rd) == !active);
675         LASSERT (!tx->tx_md.md_active);
676         LASSERT (tx->tx_md.md_fmrcount > 0);
677         LASSERT (page_offset < PAGE_SIZE);
678         LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
679         LASSERT (npages <= LNET_MAX_IOV);
680
681         if (!active) {
682                 // access.s.MWBindable = 1;
683                 access.s.LocalWrite = 1;
684                 access.s.RdmaWrite = 1;
685         }
686
687         /* Map the memory described by tx->tx_pages
688         frc = iibt_register_physical_memory(kibnal_data.kib_hca,
689                                             IBNAL_RDMA_BASE,
690                                             tx->tx_pages, npages,
691                                             page_offset,
692                                             kibnal_data.kib_pd,
693                                             access,
694                                             &tx->tx_md.md_handle,
695                                             &tx->tx_md.md_addr,
696                                             &tx->tx_md.md_lkey,
697                                             &tx->tx_md.md_rkey);
698         */
699         return -EINVAL;
700 }
701
702 int
703 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
704                      unsigned int niov, struct iovec *iov, int offset, int nob)
705                  
706 {
707         int           resid;
708         int           fragnob;
709         struct page  *page;
710         int           npages;
711         unsigned long page_offset;
712         unsigned long vaddr;
713
714         LASSERT (nob > 0);
715         LASSERT (niov > 0);
716
717         while (offset >= iov->iov_len) {
718                 offset -= iov->iov_len;
719                 niov--;
720                 iov++;
721                 LASSERT (niov > 0);
722         }
723
724         if (nob > iov->iov_len - offset) {
725                 CERROR ("Can't map multiple vaddr fragments\n");
726                 return (-EMSGSIZE);
727         }
728
729         vaddr = ((unsigned long)iov->iov_base) + offset;
730         
731         page_offset = vaddr & (PAGE_SIZE - 1);
732         resid = nob;
733         npages = 0;
734
735         do {
736                 LASSERT (npages < LNET_MAX_IOV);
737
738                 page = kibnal_kvaddr_to_page(vaddr);
739                 if (page == NULL) {
740                         CERROR("Can't find page for %lu\n", vaddr);
741                         return -EFAULT;
742                 }
743
744                 tx->tx_pages[npages++] = lnet_page2phys(page);
745
746                 fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
747                 vaddr += fragnob;
748                 resid -= fragnob;
749
750         } while (resid > 0);
751
752         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
753 }
754
755 int
756 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
757                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
758 {
759         int            resid;
760         int            npages;
761         unsigned long  page_offset;
762         
763         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
764
765         LASSERT (nob > 0);
766         LASSERT (nkiov > 0);
767         LASSERT (nkiov <= LNET_MAX_IOV);
768         LASSERT (!tx->tx_md.md_active);
769         LASSERT ((rd != tx->tx_rd) == !active);
770
771         while (offset >= kiov->kiov_len) {
772                 offset -= kiov->kiov_len;
773                 nkiov--;
774                 kiov++;
775                 LASSERT (nkiov > 0);
776         }
777
778         page_offset = kiov->kiov_offset + offset;
779         
780         resid = offset + nob;
781         npages = 0;
782
783         do {
784                 LASSERT (npages < LNET_MAX_IOV);
785                 LASSERT (nkiov > 0);
786
787                 if ((npages > 0 && kiov->kiov_offset != 0) ||
788                     (resid > kiov->kiov_len && 
789                      (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
790                         /* Can't have gaps */
791                         CERROR ("Can't make payload contiguous in I/O VM:"
792                                 "page %d, offset %d, len %d \n",
793                                 npages, kiov->kiov_offset, kiov->kiov_len);
794                         
795                         return -EINVAL;
796                 }
797
798                 tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);
799                 resid -= kiov->kiov_len;
800                 kiov++;
801                 nkiov--;
802         } while (resid > 0);
803
804         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
805 }
806 #endif
807
808 kib_conn_t *
809 kibnal_find_conn_locked (kib_peer_t *peer)
810 {
811         struct list_head *tmp;
812
813         /* just return the first connection */
814         list_for_each (tmp, &peer->ibp_conns) {
815                 return (list_entry(tmp, kib_conn_t, ibc_list));
816         }
817
818         return (NULL);
819 }
820
821 void
822 kibnal_check_sends (kib_conn_t *conn)
823 {
824         kib_tx_t       *tx;
825         FSTATUS         frc;
826         int             rc;
827         int             consume_cred;
828         int             done;
829
830         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
831         
832         spin_lock(&conn->ibc_lock);
833
834         LASSERT (conn->ibc_nsends_posted <=
835                 *kibnal_tunables.kib_concurrent_sends);
836         LASSERT (conn->ibc_reserved_credits >= 0);
837         
838         while (conn->ibc_reserved_credits > 0 &&
839                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
840                 LASSERT (conn->ibc_version != 
841                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
842                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
843                                 kib_tx_t, tx_list);
844                 list_del(&tx->tx_list);
845                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
846                 conn->ibc_reserved_credits--;
847         }
848
849         if (list_empty(&conn->ibc_tx_queue) &&
850             list_empty(&conn->ibc_tx_queue_nocred) &&
851             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
852              kibnal_send_keepalive(conn))) {
853                 spin_unlock(&conn->ibc_lock);
854                 
855                 tx = kibnal_get_idle_tx();
856                 if (tx != NULL)
857                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
858
859                 spin_lock(&conn->ibc_lock);
860                 
861                 if (tx != NULL)
862                         kibnal_queue_tx_locked(tx, conn);
863         }
864
865         for (;;) {
866                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
867                         LASSERT (conn->ibc_version != 
868                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
869                         tx = list_entry (conn->ibc_tx_queue_nocred.next, 
870                                          kib_tx_t, tx_list);
871                         consume_cred = 0;
872                 } else if (!list_empty (&conn->ibc_tx_queue)) {
873                         tx = list_entry (conn->ibc_tx_queue.next, 
874                                          kib_tx_t, tx_list);
875                         consume_cred = 1;
876                 } else {
877                         /* nothing waiting */
878                         break;
879                 }
880
881                 LASSERT (tx->tx_queued);
882                 /* We rely on this for QP sizing */
883                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
884
885                 LASSERT (conn->ibc_outstanding_credits >= 0);
886                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
887                 LASSERT (conn->ibc_credits >= 0);
888                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
889
890                 if (conn->ibc_nsends_posted ==
891                     *kibnal_tunables.kib_concurrent_sends) {
892                         /* We've got some tx completions outstanding... */
893                         CDEBUG(D_NET, "%s: posted enough\n",
894                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
895                         break;
896                 }
897
898                 if (consume_cred) {
899                         if (conn->ibc_credits == 0) {   /* no credits */
900                                 CDEBUG(D_NET, "%s: no credits\n",
901                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
902                                 break;
903                         }
904                         
905                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
906                             conn->ibc_outstanding_credits == 0) { /* giving back credits */
907                                 CDEBUG(D_NET, "%s: not using last credit\n",
908                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
909                                 break;
910                         }
911                 }
912                 
913                 list_del (&tx->tx_list);
914                 tx->tx_queued = 0;
915
916                 /* NB don't drop ibc_lock before bumping tx_sending */
917
918                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
919                     (!list_empty(&conn->ibc_tx_queue) ||
920                      !list_empty(&conn->ibc_tx_queue_nocred) ||
921                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
922                       !kibnal_send_keepalive(conn)))) {
923                         /* redundant NOOP */
924                         spin_unlock(&conn->ibc_lock);
925                         kibnal_tx_done(tx);
926                         spin_lock(&conn->ibc_lock);
927                         CDEBUG(D_NET, "%s: redundant noop\n",
928                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
929                         continue;
930                 }
931
932                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
933                                 conn->ibc_outstanding_credits,
934                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
935                                 conn->ibc_txseq);
936
937                 conn->ibc_txseq++;
938                 conn->ibc_outstanding_credits = 0;
939                 conn->ibc_nsends_posted++;
940                 if (consume_cred)
941                         conn->ibc_credits--;
942
943                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
944                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
945                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
946                  * and then re-queued here.  It's (just) possible that
947                  * tx_sending is non-zero if we've not done the tx_complete() from
948                  * the first send; hence the ++ rather than = below. */
949                 tx->tx_sending++;
950
951                 list_add (&tx->tx_list, &conn->ibc_active_txs);
952
953                 LASSERT (tx->tx_nwrq > 0);
954
955                 rc = 0;
956                 frc = FSUCCESS;
957                 if (conn->ibc_state != IBNAL_CONN_ESTABLISHED) {
958                         rc = -ECONNABORTED;
959                 } else {
960                         frc = iba_post_send2(conn->ibc_qp, tx->tx_wrq, NULL);
961                         if (frc != FSUCCESS)
962                                 rc = -EIO;
963                 }
964
965                 conn->ibc_last_send = jiffies;
966
967                 if (rc != 0) {
968                         /* NB credits are transferred in the actual
969                          * message, which can only be the last work item */
970                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
971                         if (consume_cred)
972                                 conn->ibc_credits++;
973                         conn->ibc_nsends_posted--;
974
975                         tx->tx_status = rc;
976                         tx->tx_waiting = 0;
977                         tx->tx_sending--;
978                         
979                         done = (tx->tx_sending == 0);
980                         if (done)
981                                 list_del (&tx->tx_list);
982                         
983                         spin_unlock(&conn->ibc_lock);
984                         
985                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
986                                 CERROR ("Error %d posting transmit to %s\n", 
987                                         frc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
988                         else
989                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
990                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
991
992                         kibnal_close_conn (conn, rc);
993
994                         if (done)
995                                 kibnal_tx_done (tx);
996                         return;
997                 }
998         }
999
1000         spin_unlock(&conn->ibc_lock);
1001 }
1002
1003 void
1004 kibnal_tx_complete (IB_WORK_COMPLETION *wc)
1005 {
1006         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
1007         kib_conn_t   *conn = tx->tx_conn;
1008         int           failed = wc->Status != WRStatusSuccess;
1009         int           idle;
1010
1011         CDEBUG(D_NET, "%s: sending %d nwrq %d status %d\n", 
1012                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1013                tx->tx_sending, tx->tx_nwrq, wc->Status);
1014
1015         LASSERT (tx->tx_sending > 0);
1016
1017         if (failed &&
1018             tx->tx_status == 0 &&
1019             conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1020 #if KIBLND_DETAILED_DEBUG
1021                 int                   i;
1022                 IB_WORK_REQ2         *wrq = &tx->tx_wrq[0];
1023                 IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[0];
1024                 lnet_msg_t           *lntmsg = tx->tx_lntmsg[0];
1025 #endif
1026                 CDEBUG(D_NETERROR, "tx -> %s type %x cookie "LPX64
1027                        " sending %d waiting %d failed %d nwrk %d\n", 
1028                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
1029                        tx->tx_msg->ibm_type, tx->tx_cookie,
1030                        tx->tx_sending, tx->tx_waiting, wc->Status,
1031                        tx->tx_nwrq);
1032 #if KIBLND_DETAILED_DEBUG
1033                 for (i = 0; i < tx->tx_nwrq; i++, wrq++, gl++) {
1034                         switch (wrq->Operation) {
1035                         default:
1036                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p OP %d "
1037                                        "DSList %p(%p)/%d: "LPX64"/%d K %x\n",
1038                                        i, wrq, wrq->Next, wrq->Operation,
1039                                        wrq->DSList, gl, wrq->DSListDepth,
1040                                        gl->Address, gl->Length, gl->Lkey);
1041                                 break;
1042                         case WROpSend:
1043                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p SEND "
1044                                        "DSList %p(%p)/%d: "LPX64"/%d K %x\n",
1045                                        i, wrq, wrq->Next, 
1046                                        wrq->DSList, gl, wrq->DSListDepth,
1047                                        gl->Address, gl->Length, gl->Lkey);
1048                                 break;
1049                         case WROpRdmaWrite:
1050                                 CDEBUG(D_NETERROR, "    [%3d] Addr %p Next %p DMA "
1051                                        "DSList: %p(%p)/%d "LPX64"/%d K %x -> "
1052                                        LPX64" K %x\n",
1053                                        i, wrq, wrq->Next, 
1054                                        wrq->DSList, gl, wrq->DSListDepth,
1055                                        gl->Address, gl->Length, gl->Lkey,
1056                                        wrq->Req.SendRC.RemoteDS.Address,
1057                                        wrq->Req.SendRC.RemoteDS.Rkey);
1058                                 break;
1059                         }
1060                 }
1061                 
1062                 switch (tx->tx_msg->ibm_type) {
1063                 default:
1064                         CDEBUG(D_NETERROR, "  msg type %x %p/%d, No RDMA\n", 
1065                                tx->tx_msg->ibm_type, 
1066                                tx->tx_msg, tx->tx_msg->ibm_nob);
1067                         break;
1068
1069                 case IBNAL_MSG_PUT_DONE:
1070                 case IBNAL_MSG_GET_DONE:
1071                         CDEBUG(D_NETERROR, "  msg type %x %p/%d, RDMA key %x frags %d...\n", 
1072                                tx->tx_msg->ibm_type, 
1073                                tx->tx_msg, tx->tx_msg->ibm_nob,
1074                                tx->tx_rd->rd_key, tx->tx_rd->rd_nfrag);
1075                         for (i = 0; i < tx->tx_rd->rd_nfrag; i++)
1076                                 CDEBUG(D_NETERROR, "    [%d] "LPX64"/%d\n", i,
1077                                        tx->tx_rd->rd_frags[i].rf_addr,
1078                                        tx->tx_rd->rd_frags[i].rf_nob);
1079                         if (lntmsg == NULL) {
1080                                 CDEBUG(D_NETERROR, "  No lntmsg\n");
1081                         } else if (lntmsg->msg_iov != NULL) {
1082                                 CDEBUG(D_NETERROR, "  lntmsg in %d VIRT frags...\n", 
1083                                        lntmsg->msg_niov);
1084                                 for (i = 0; i < lntmsg->msg_niov; i++)
1085                                         CDEBUG(D_NETERROR, "    [%d] %p/%d\n", i,
1086                                                lntmsg->msg_iov[i].iov_base,
1087                                                lntmsg->msg_iov[i].iov_len);
1088                         } else if (lntmsg->msg_kiov != NULL) {
1089                                 CDEBUG(D_NETERROR, "  lntmsg in %d PAGE frags...\n", 
1090                                        lntmsg->msg_niov);
1091                                 for (i = 0; i < lntmsg->msg_niov; i++)
1092                                         CDEBUG(D_NETERROR, "    [%d] %p+%d/%d\n", i,
1093                                                lntmsg->msg_kiov[i].kiov_page,
1094                                                lntmsg->msg_kiov[i].kiov_offset,
1095                                                lntmsg->msg_kiov[i].kiov_len);
1096                         } else {
1097                                 CDEBUG(D_NETERROR, "  lntmsg in %d frags\n", 
1098                                        lntmsg->msg_niov);
1099                         }
1100                         
1101                         break;
1102                 }
1103 #endif
1104         }
1105         
1106         spin_lock(&conn->ibc_lock);
1107
1108         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1109          * gets to free it, which also drops its ref on 'conn'. */
1110
1111         tx->tx_sending--;
1112         conn->ibc_nsends_posted--;
1113
1114         if (failed) {
1115                 tx->tx_waiting = 0;
1116                 tx->tx_status = -EIO;
1117         }
1118         
1119         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1120                !tx->tx_waiting &&               /* Not waiting for peer */
1121                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1122         if (idle)
1123                 list_del(&tx->tx_list);
1124
1125         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1126
1127         spin_unlock(&conn->ibc_lock);
1128
1129         if (idle)
1130                 kibnal_tx_done (tx);
1131
1132         if (failed) {
1133                 kibnal_close_conn (conn, -EIO);
1134         } else {
1135                 kibnal_peer_alive(conn->ibc_peer);
1136                 kibnal_check_sends(conn);
1137         }
1138
1139         kibnal_conn_decref(conn);               /* ...until here */
1140 }
1141
1142 void
1143 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1144 {
1145         IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[tx->tx_nwrq];
1146         IB_WORK_REQ2         *wrq = &tx->tx_wrq[tx->tx_nwrq];
1147         int                   nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1148
1149         LASSERT (tx->tx_nwrq >= 0 && 
1150                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1151         LASSERT (nob <= IBNAL_MSG_SIZE);
1152
1153         kibnal_init_msg(tx->tx_msg, type, body_nob);
1154
1155         *gl = (IB_LOCAL_DATASEGMENT) {
1156                 .Address = tx->tx_hca_msg,
1157                 .Length  = IBNAL_MSG_SIZE,
1158                 .Lkey    = kibnal_data.kib_whole_mem.md_lkey,
1159         };
1160
1161         wrq->Next           = NULL;             /* This is the last one */
1162
1163         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1164         wrq->Operation      = WROpSend;
1165         wrq->DSList         = gl;
1166         wrq->DSListDepth    = 1;
1167         wrq->MessageLen     = nob;
1168         wrq->Req.SendRC.ImmediateData  = 0;
1169         wrq->Req.SendRC.Options.s.SolicitedEvent         = 1;
1170         wrq->Req.SendRC.Options.s.SignaledCompletion     = 1;
1171         wrq->Req.SendRC.Options.s.ImmediateData          = 0;
1172         wrq->Req.SendRC.Options.s.Fence                  = 0; 
1173         /* fence only needed on RDMA reads */
1174         
1175         tx->tx_nwrq++;
1176 }
1177
1178 int
1179 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1180                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1181 {
1182         kib_msg_t            *ibmsg = tx->tx_msg;
1183         kib_rdma_desc_t      *srcrd = tx->tx_rd;
1184         IB_LOCAL_DATASEGMENT *gl;
1185         IB_WORK_REQ2         *wrq;
1186         int                   rc;
1187
1188 #if IBNAL_USE_FMR
1189         LASSERT (tx->tx_nwrq == 0);
1190
1191         gl = &tx->tx_gl[0];
1192         gl->Length  = nob;
1193         gl->Address = srcrd->rd_addr;
1194         gl->Lkey    = srcrd->rd_key;
1195
1196         wrq = &tx->tx_wrq[0];
1197
1198         wrq->Next           = wrq + 1;
1199         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1200         wrq->Operation      = WROpRdmaWrite;
1201         wrq->DSList         = gl;
1202         wrq->DSListDepth    = 1;
1203         wrq->MessageLen     = nob;
1204
1205         wrq->Req.SendRC.ImmediateData                = 0;
1206         wrq->Req.SendRC.Options.s.SolicitedEvent     = 0;
1207         wrq->Req.SendRC.Options.s.SignaledCompletion = 0;
1208         wrq->Req.SendRC.Options.s.ImmediateData      = 0;
1209         wrq->Req.SendRC.Options.s.Fence              = 0; 
1210
1211         wrq->Req.SendRC.RemoteDS.Address = dstrd->rd_addr;
1212         wrq->Req.SendRC.RemoteDS.Rkey    = dstrd->rd_key;
1213
1214         tx->tx_nwrq = 1;
1215         rc = nob;
1216 #else
1217         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1218         int              resid = nob;
1219         kib_rdma_frag_t *srcfrag;
1220         int              srcidx;
1221         kib_rdma_frag_t *dstfrag;
1222         int              dstidx;
1223         int              wrknob;
1224
1225         /* Called by scheduler */
1226         LASSERT (!in_interrupt());
1227
1228         LASSERT (type == IBNAL_MSG_GET_DONE ||
1229                  type == IBNAL_MSG_PUT_DONE);
1230
1231         srcidx = dstidx = 0;
1232         srcfrag = &srcrd->rd_frags[0];
1233         dstfrag = &dstrd->rd_frags[0];
1234         rc = resid;
1235
1236         while (resid > 0) {
1237                 if (srcidx >= srcrd->rd_nfrag) {
1238                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1239                         rc = -EPROTO;
1240                         break;
1241                 }
1242                 
1243                 if (dstidx == dstrd->rd_nfrag) {
1244                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1245                         rc = -EPROTO;
1246                         break;
1247                 }
1248
1249                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1250                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1251                                srcidx, srcrd->rd_nfrag,
1252                                dstidx, dstrd->rd_nfrag);
1253                         rc = -EMSGSIZE;
1254                         break;
1255                 }
1256
1257                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1258
1259                 gl = &tx->tx_gl[tx->tx_nwrq];
1260                 gl->Length  = wrknob;
1261                 gl->Address = srcfrag->rf_addr;
1262                 gl->Lkey    = srcrd->rd_key;
1263
1264                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1265
1266                 wrq->Next           = wrq + 1;
1267                 wrq->WorkReqId      = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1268                 wrq->Operation      = WROpRdmaWrite;
1269                 wrq->DSList         = gl;
1270                 wrq->DSListDepth    = 1;
1271                 wrq->MessageLen     = nob;
1272
1273                 wrq->Req.SendRC.ImmediateData                = 0;
1274                 wrq->Req.SendRC.Options.s.SolicitedEvent     = 0;
1275                 wrq->Req.SendRC.Options.s.SignaledCompletion = 0;
1276                 wrq->Req.SendRC.Options.s.ImmediateData      = 0;
1277                 wrq->Req.SendRC.Options.s.Fence              = 0; 
1278
1279                 wrq->Req.SendRC.RemoteDS.Address = dstfrag->rf_addr;
1280                 wrq->Req.SendRC.RemoteDS.Rkey    = dstrd->rd_key;
1281
1282                 resid -= wrknob;
1283                 if (wrknob < srcfrag->rf_nob) {
1284                         srcfrag->rf_addr += wrknob;
1285                         srcfrag->rf_nob -= wrknob;
1286                 } else {
1287                         srcfrag++;
1288                         srcidx++;
1289                 }
1290                 
1291                 if (wrknob < dstfrag->rf_nob) {
1292                         dstfrag->rf_addr += wrknob;
1293                         dstfrag->rf_nob -= wrknob;
1294                 } else {
1295                         dstfrag++;
1296                         dstidx++;
1297                 }
1298                 
1299                 tx->tx_nwrq++;
1300         }
1301
1302         if (rc < 0)                             /* no RDMA if completing with failure */
1303                 tx->tx_nwrq = 0;
1304 #endif
1305         
1306         ibmsg->ibm_u.completion.ibcm_status = rc;
1307         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1308         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1309
1310         return rc;
1311 }
1312
1313 void
1314 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1315 {
1316         spin_lock(&conn->ibc_lock);
1317         kibnal_queue_tx_locked (tx, conn);
1318         spin_unlock(&conn->ibc_lock);
1319         
1320         kibnal_check_sends(conn);
1321 }
1322
1323 void
1324 kibnal_schedule_active_connect_locked (kib_peer_t *peer, int proto_version)
1325 {
1326         /* Called holding kib_global_lock exclusive with IRQs disabled */
1327
1328         peer->ibp_version = proto_version;      /* proto version for new conn */
1329         peer->ibp_connecting++;                 /* I'm connecting */
1330         kibnal_peer_addref(peer);               /* extra ref for connd */
1331
1332         spin_lock(&kibnal_data.kib_connd_lock);
1333
1334         list_add_tail (&peer->ibp_connd_list, &kibnal_data.kib_connd_peers);
1335         wake_up (&kibnal_data.kib_connd_waitq);
1336
1337         spin_unlock(&kibnal_data.kib_connd_lock);
1338 }
1339
1340 void
1341 kibnal_schedule_active_connect (kib_peer_t *peer, int proto_version)
1342 {
1343         unsigned long flags;
1344
1345         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1346
1347         kibnal_schedule_active_connect_locked(peer, proto_version);
1348
1349         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1350 }
1351
1352 void
1353 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
1354 {
1355         kib_peer_t      *peer;
1356         kib_conn_t      *conn;
1357         unsigned long    flags;
1358         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1359         int              retry;
1360         int              rc;
1361
1362         /* If I get here, I've committed to send, so I complete the tx with
1363          * failure on any problems */
1364         
1365         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1366         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1367
1368         for (retry = 0; ; retry = 1) {
1369                 read_lock_irqsave(g_lock, flags);
1370         
1371                 peer = kibnal_find_peer_locked (nid);
1372                 if (peer != NULL) {
1373                         conn = kibnal_find_conn_locked (peer);
1374                         if (conn != NULL) {
1375                                 kibnal_conn_addref(conn); /* 1 ref for me... */
1376                                 read_unlock_irqrestore(g_lock, flags);
1377
1378                                 kibnal_queue_tx (tx, conn);
1379                                 kibnal_conn_decref(conn); /* ...to here */
1380                                 return;
1381                         }
1382                 }
1383                 
1384                 /* Making one or more connections; I'll need a write lock... */
1385                 read_unlock(g_lock);
1386                 write_lock(g_lock);
1387
1388                 peer = kibnal_find_peer_locked (nid);
1389                 if (peer != NULL)
1390                         break;
1391
1392                 write_unlock_irqrestore(g_lock, flags);
1393
1394                 if (retry) {
1395                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
1396
1397                         tx->tx_status = -EHOSTUNREACH;
1398                         tx->tx_waiting = 0;
1399                         kibnal_tx_done (tx);
1400                         return;
1401                 }
1402
1403                 rc = kibnal_add_persistent_peer(nid);
1404                 if (rc != 0) {
1405                         CERROR("Can't add peer %s: %d\n",
1406                                libcfs_nid2str(nid), rc);
1407                         
1408                         tx->tx_status = -EHOSTUNREACH;
1409                         tx->tx_waiting = 0;
1410                         kibnal_tx_done (tx);
1411                         return;
1412                 }
1413         }
1414
1415         conn = kibnal_find_conn_locked (peer);
1416         if (conn != NULL) {
1417                 /* Connection exists; queue message on it */
1418                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1419                 write_unlock_irqrestore(g_lock, flags);
1420                 
1421                 kibnal_queue_tx (tx, conn);
1422                 kibnal_conn_decref(conn);       /* ...until here */
1423                 return;
1424         }
1425
1426         if (!kibnal_peer_connecting(peer)) {
1427                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1428                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1429                         write_unlock_irqrestore(g_lock, flags);
1430                         tx->tx_status = -EHOSTUNREACH;
1431                         tx->tx_waiting = 0;
1432                         kibnal_tx_done (tx);
1433                         return;
1434                 }
1435
1436                 kibnal_schedule_active_connect_locked(peer, IBNAL_MSG_VERSION);
1437         }
1438         
1439         /* A connection is being established; queue the message... */
1440         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1441
1442         write_unlock_irqrestore(g_lock, flags);
1443 }
1444
1445 void
1446 kibnal_txlist_done (struct list_head *txlist, int status)
1447 {
1448         kib_tx_t *tx;
1449
1450         while (!list_empty (txlist)) {
1451                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
1452
1453                 list_del (&tx->tx_list);
1454                 /* complete now */
1455                 tx->tx_waiting = 0;
1456                 tx->tx_status = status;
1457                 kibnal_tx_done (tx);
1458         }
1459 }
1460
1461 int
1462 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1463 {
1464         lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
1465         int               type = lntmsg->msg_type; 
1466         lnet_process_id_t target = lntmsg->msg_target;
1467         int               target_is_router = lntmsg->msg_target_is_router;
1468         int               routing = lntmsg->msg_routing;
1469         unsigned int      payload_niov = lntmsg->msg_niov; 
1470         struct iovec     *payload_iov = lntmsg->msg_iov; 
1471         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1472         unsigned int      payload_offset = lntmsg->msg_offset;
1473         unsigned int      payload_nob = lntmsg->msg_len;
1474         kib_msg_t        *ibmsg;
1475         kib_tx_t         *tx;
1476         int               nob;
1477         int               rc;
1478
1479         /* NB 'private' is different depending on what we're sending.... */
1480
1481         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1482                payload_nob, payload_niov, libcfs_id2str(target));
1483
1484         LASSERT (payload_nob == 0 || payload_niov > 0);
1485         LASSERT (payload_niov <= LNET_MAX_IOV);
1486
1487         /* Thread context */
1488         LASSERT (!in_interrupt());
1489         /* payload is either all vaddrs or all pages */
1490         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1491
1492         switch (type) {
1493         default:
1494                 LBUG();
1495                 return (-EIO);
1496                 
1497         case LNET_MSG_ACK:
1498                 LASSERT (payload_nob == 0);
1499                 break;
1500
1501         case LNET_MSG_GET:
1502                 if (routing || target_is_router)
1503                         break;                  /* send IMMEDIATE */
1504                 
1505                 /* is the REPLY message too small for RDMA? */
1506                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1507                 if (nob <= IBNAL_MSG_SIZE)
1508                         break;                  /* send IMMEDIATE */
1509
1510                 tx = kibnal_get_idle_tx();
1511                 if (tx == NULL) {
1512                         CERROR("Can allocate txd for GET to %s: \n",
1513                                libcfs_nid2str(target.nid));
1514                         return -ENOMEM;
1515                 }
1516                 
1517                 ibmsg = tx->tx_msg;
1518                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1519                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1520
1521                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1522                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1523                                                  0,
1524                                                  lntmsg->msg_md->md_niov,
1525                                                  lntmsg->msg_md->md_iov.iov,
1526                                                  0, lntmsg->msg_md->md_length);
1527                 else
1528                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1529                                                   0,
1530                                                   lntmsg->msg_md->md_niov,
1531                                                   lntmsg->msg_md->md_iov.kiov,
1532                                                   0, lntmsg->msg_md->md_length);
1533                 if (rc != 0) {
1534                         CERROR("Can't setup GET sink for %s: %d\n",
1535                                libcfs_nid2str(target.nid), rc);
1536                         kibnal_tx_done(tx);
1537                         return -EIO;
1538                 }
1539
1540 #if IBNAL_USE_FMR
1541                 nob = sizeof(kib_get_msg_t);
1542 #else
1543                 {
1544                         int n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1545                         
1546                         nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1547                 }
1548 #endif
1549                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1550
1551                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni,
1552                                                          lntmsg);
1553                 if (tx->tx_lntmsg[1] == NULL) {
1554                         CERROR("Can't create reply for GET -> %s\n",
1555                                libcfs_nid2str(target.nid));
1556                         kibnal_tx_done(tx);
1557                         return -EIO;
1558                 }
1559
1560                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1561                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1562                 kibnal_launch_tx(tx, target.nid);
1563                 return 0;
1564
1565         case LNET_MSG_REPLY: 
1566         case LNET_MSG_PUT:
1567                 /* Is the payload small enough not to need RDMA? */
1568                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1569                 if (nob <= IBNAL_MSG_SIZE)
1570                         break;                  /* send IMMEDIATE */
1571
1572                 tx = kibnal_get_idle_tx();
1573                 if (tx == NULL) {
1574                         CERROR("Can't allocate %s txd for %s\n",
1575                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1576                                libcfs_nid2str(target.nid));
1577                         return -ENOMEM;
1578                 }
1579
1580                 if (payload_kiov == NULL)
1581                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1,
1582                                                  payload_niov, payload_iov,
1583                                                  payload_offset, payload_nob);
1584                 else
1585                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1,
1586                                                   payload_niov, payload_kiov,
1587                                                   payload_offset, payload_nob);
1588                 if (rc != 0) {
1589                         CERROR("Can't setup PUT src for %s: %d\n",
1590                                libcfs_nid2str(target.nid), rc);
1591                         kibnal_tx_done(tx);
1592                         return -EIO;
1593                 }
1594
1595                 ibmsg = tx->tx_msg;
1596                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1597                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1598                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1599
1600                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1601                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1602                 kibnal_launch_tx(tx, target.nid);
1603                 return 0;
1604         }
1605
1606         /* send IMMEDIATE */
1607
1608         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1609                  <= IBNAL_MSG_SIZE);
1610
1611         tx = kibnal_get_idle_tx();
1612         if (tx == NULL) {
1613                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1614                         type, libcfs_nid2str(target.nid));
1615                 return -ENOMEM;
1616         }
1617
1618         ibmsg = tx->tx_msg;
1619         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1620
1621         if (payload_kiov != NULL)
1622                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1623                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1624                                     payload_niov, payload_kiov,
1625                                     payload_offset, payload_nob);
1626         else
1627                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1628                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1629                                    payload_niov, payload_iov,
1630                                    payload_offset, payload_nob);
1631
1632         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1633         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1634
1635         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1636         kibnal_launch_tx(tx, target.nid);
1637         return 0;
1638 }
1639
1640 void
1641 kibnal_reply(lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1642 {
1643         lnet_process_id_t target = lntmsg->msg_target;
1644         unsigned int      niov = lntmsg->msg_niov; 
1645         struct iovec     *iov = lntmsg->msg_iov; 
1646         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1647         unsigned int      offset = lntmsg->msg_offset;
1648         unsigned int      nob = lntmsg->msg_len;
1649         kib_tx_t         *tx;
1650         int               rc;
1651         
1652         tx = kibnal_get_idle_tx();
1653         if (tx == NULL) {
1654                 CERROR("Can't get tx for REPLY to %s\n",
1655                        libcfs_nid2str(target.nid));
1656                 goto failed_0;
1657         }
1658
1659         if (nob == 0)
1660                 rc = 0;
1661         else if (kiov == NULL)
1662                 rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1, 
1663                                          niov, iov, offset, nob);
1664         else
1665                 rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1, 
1666                                           niov, kiov, offset, nob);
1667
1668         if (rc != 0) {
1669                 CERROR("Can't setup GET src for %s: %d\n",
1670                        libcfs_nid2str(target.nid), rc);
1671                 goto failed_1;
1672         }
1673         
1674         rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
1675                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1676                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1677         if (rc < 0) {
1678                 CERROR("Can't setup rdma for GET from %s: %d\n", 
1679                        libcfs_nid2str(target.nid), rc);
1680                 goto failed_1;
1681         }
1682         
1683         if (rc == 0) {
1684                 /* No RDMA: local completion may happen now! */
1685                 lnet_finalize(ni, lntmsg, 0);
1686         } else {
1687                 /* RDMA: lnet_finalize(lntmsg) when it
1688                  * completes */
1689                 tx->tx_lntmsg[0] = lntmsg;
1690         }
1691         
1692         kibnal_queue_tx(tx, rx->rx_conn);
1693         return;
1694         
1695  failed_1:
1696         kibnal_tx_done(tx);
1697  failed_0:
1698         lnet_finalize(ni, lntmsg, -EIO);
1699 }
1700
1701 int
1702 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1703                    void **new_private)
1704 {
1705         kib_rx_t    *rx = private;
1706         kib_conn_t  *conn = rx->rx_conn;
1707
1708         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1709                 /* Can't block if RDMA completions need normal credits */
1710                 LCONSOLE_ERROR_MSG(0x12d,  "Dropping message from %s: no "
1711                                    "buffers free. %s is running an old version"
1712                                    " of LNET that may deadlock if messages "
1713                                    "wait for buffers)\n",
1714                                    libcfs_nid2str(conn->ibc_peer->ibp_nid),
1715                                    libcfs_nid2str(conn->ibc_peer->ibp_nid));
1716                 return -EDEADLK;
1717         }
1718         
1719         *new_private = private;
1720         return 0;
1721 }
1722
1723 int
1724 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1725              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1726              unsigned int offset, unsigned int mlen, unsigned int rlen)
1727 {
1728         kib_rx_t    *rx = private;
1729         kib_msg_t   *rxmsg = rx->rx_msg;
1730         kib_conn_t  *conn = rx->rx_conn;
1731         kib_tx_t    *tx;
1732         kib_msg_t   *txmsg;
1733         int          nob;
1734         int          post_cred = 1;
1735         int          rc = 0;
1736         
1737         LASSERT (mlen <= rlen);
1738         LASSERT (!in_interrupt());
1739         /* Either all pages or all vaddrs */
1740         LASSERT (!(kiov != NULL && iov != NULL));
1741
1742         switch (rxmsg->ibm_type) {
1743         default:
1744                 LBUG();
1745                 
1746         case IBNAL_MSG_IMMEDIATE:
1747                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1748                 if (nob > rx->rx_nob) {
1749                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1750                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1751                                 nob, rx->rx_nob);
1752                         rc = -EPROTO;
1753                         break;
1754                 }
1755
1756                 if (kiov != NULL)
1757                         lnet_copy_flat2kiov(niov, kiov, offset,
1758                                             IBNAL_MSG_SIZE, rxmsg,
1759                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1760                                             mlen);
1761                 else
1762                         lnet_copy_flat2iov(niov, iov, offset,
1763                                            IBNAL_MSG_SIZE, rxmsg,
1764                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1765                                            mlen);
1766                 lnet_finalize (ni, lntmsg, 0);
1767                 break;
1768
1769         case IBNAL_MSG_PUT_REQ:
1770                 if (mlen == 0) {
1771                         lnet_finalize(ni, lntmsg, 0);
1772                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
1773                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1774                         break;
1775                 }
1776                 
1777                 tx = kibnal_get_idle_tx();
1778                 if (tx == NULL) {
1779                         CERROR("Can't allocate tx for %s\n",
1780                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1781                         /* Not replying will break the connection */
1782                         rc = -ENOMEM;
1783                         break;
1784                 }
1785
1786                 txmsg = tx->tx_msg;
1787                 if (kiov == NULL)
1788                         rc = kibnal_setup_rd_iov(tx, 
1789                                                  &txmsg->ibm_u.putack.ibpam_rd,
1790                                                  0,
1791                                                  niov, iov, offset, mlen);
1792                 else
1793                         rc = kibnal_setup_rd_kiov(tx,
1794                                                   &txmsg->ibm_u.putack.ibpam_rd,
1795                                                   0,
1796                                                   niov, kiov, offset, mlen);
1797                 if (rc != 0) {
1798                         CERROR("Can't setup PUT sink for %s: %d\n",
1799                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1800                         kibnal_tx_done(tx);
1801                         /* tell peer it's over */
1802                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, rc,
1803                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1804                         break;
1805                 }
1806
1807                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1808                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1809 #if IBNAL_USE_FMR
1810                 nob = sizeof(kib_putack_msg_t);
1811 #else
1812                 {
1813                         int n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1814
1815                         nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1816                 }
1817 #endif
1818                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1819
1820                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1821                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1822                 kibnal_queue_tx(tx, conn);
1823
1824                 if (conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
1825                         post_cred = 0; /* peer still owns 'rx' for sending PUT_DONE */
1826                 break;
1827
1828         case IBNAL_MSG_GET_REQ:
1829                 if (lntmsg != NULL) {
1830                         /* Optimized GET; RDMA lntmsg's payload */
1831                         kibnal_reply(ni, rx, lntmsg);
1832                 } else {
1833                         /* GET didn't match anything */
1834                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, 
1835                                                -ENODATA,
1836                                                rxmsg->ibm_u.get.ibgm_cookie);
1837                 }
1838                 break;
1839         }
1840
1841         kibnal_post_rx(rx, post_cred, 0);
1842         return rc;
1843 }
1844
1845 int
1846 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1847 {
1848         long    pid = kernel_thread (fn, arg, 0);
1849
1850         if (pid < 0)
1851                 return ((int)pid);
1852
1853         atomic_inc (&kibnal_data.kib_nthreads);
1854         return (0);
1855 }
1856
1857 void
1858 kibnal_thread_fini (void)
1859 {
1860         atomic_dec (&kibnal_data.kib_nthreads);
1861 }
1862
1863 void
1864 kibnal_peer_alive (kib_peer_t *peer)
1865 {
1866         /* This is racy, but everyone's only writing cfs_time_current() */
1867         peer->ibp_last_alive = cfs_time_current();
1868         mb();
1869 }
1870
1871 void
1872 kibnal_peer_notify (kib_peer_t *peer)
1873 {
1874         time_t        last_alive = 0;
1875         int           error = 0;
1876         unsigned long flags;
1877         
1878         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1879
1880         if (list_empty(&peer->ibp_conns) &&
1881             peer->ibp_accepting == 0 &&
1882             peer->ibp_connecting == 0 &&
1883             peer->ibp_error != 0) {
1884                 error = peer->ibp_error;
1885                 peer->ibp_error = 0;
1886                 last_alive = cfs_time_current_sec() -
1887                              cfs_duration_sec(cfs_time_current() -
1888                                               peer->ibp_last_alive);
1889         }
1890         
1891         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1892         
1893         if (error != 0)
1894                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1895 }
1896
1897 void
1898 kibnal_schedule_conn (kib_conn_t *conn)
1899 {
1900         unsigned long flags;
1901
1902         kibnal_conn_addref(conn);               /* ++ref for connd */
1903         
1904         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1905
1906         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1907         wake_up (&kibnal_data.kib_connd_waitq);
1908                 
1909         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1910 }
1911
1912 void
1913 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1914 {
1915         /* This just does the immediate housekeeping to start shutdown of an
1916          * established connection.  'error' is zero for a normal shutdown.
1917          * Caller holds kib_global_lock exclusively in irq context */
1918         kib_peer_t       *peer = conn->ibc_peer;
1919         
1920         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1921
1922         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1923                 return; /* already being handled  */
1924         
1925         /* NB Can't take ibc_lock here (could be in IRQ context), without
1926          * risking deadlock, so access to ibc_{tx_queue,active_txs} is racey */
1927
1928         if (error == 0 &&
1929             list_empty(&conn->ibc_tx_queue) &&
1930             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1931             list_empty(&conn->ibc_tx_queue_nocred) &&
1932             list_empty(&conn->ibc_active_txs)) {
1933                 CDEBUG(D_NET, "closing conn to %s"
1934                        " rx# "LPD64" tx# "LPD64"\n", 
1935                        libcfs_nid2str(peer->ibp_nid),
1936                        conn->ibc_txseq, conn->ibc_rxseq);
1937         } else {
1938                 CDEBUG(D_NETERROR, "Closing conn to %s: error %d%s%s%s%s"
1939                        " rx# "LPD64" tx# "LPD64"\n",
1940                        libcfs_nid2str(peer->ibp_nid), error,
1941                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1942                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1943                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1944                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
1945                        conn->ibc_txseq, conn->ibc_rxseq);
1946 #if 0
1947                 /* can't skip down the queue without holding ibc_lock (see above) */
1948                 list_for_each(tmp, &conn->ibc_tx_queue) {
1949                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1950                         
1951                         CERROR("   queued tx type %x cookie "LPX64
1952                                " sending %d waiting %d ticks %ld/%d\n", 
1953                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1954                                tx->tx_sending, tx->tx_waiting,
1955                                (long)(tx->tx_deadline - jiffies), HZ);
1956                 }
1957
1958                 list_for_each(tmp, &conn->ibc_active_txs) {
1959                         kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
1960                         
1961                         CERROR("   active tx type %x cookie "LPX64
1962                                " sending %d waiting %d ticks %ld/%d\n", 
1963                                tx->tx_msg->ibm_type, tx->tx_cookie, 
1964                                tx->tx_sending, tx->tx_waiting,
1965                                (long)(tx->tx_deadline - jiffies), HZ);
1966                 }
1967 #endif
1968         }
1969
1970         list_del (&conn->ibc_list);
1971
1972         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1973                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1974                     kibnal_peer_active(peer))     /* still in peer table */
1975                         kibnal_unlink_peer_locked (peer);
1976
1977                 peer->ibp_error = error; /* set/clear error on last conn */
1978         }
1979
1980         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTING);
1981
1982         kibnal_schedule_conn(conn);
1983         kibnal_conn_decref(conn);               /* lose ibc_list's ref */
1984 }
1985
1986 void
1987 kibnal_close_conn (kib_conn_t *conn, int error)
1988 {
1989         unsigned long flags;
1990         
1991         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1992
1993         kibnal_close_conn_locked (conn, error);
1994         
1995         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1996 }
1997
1998 void
1999 kibnal_handle_early_rxs(kib_conn_t *conn)
2000 {
2001         unsigned long    flags;
2002         kib_rx_t        *rx;
2003
2004         LASSERT (!in_interrupt());
2005         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
2006         
2007         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2008         while (!list_empty(&conn->ibc_early_rxs)) {
2009                 rx = list_entry(conn->ibc_early_rxs.next,
2010                                 kib_rx_t, rx_list);
2011                 list_del(&rx->rx_list);
2012                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2013                 
2014                 kibnal_handle_rx(rx);
2015                 
2016                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2017         }
2018         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2019 }
2020
2021 void
2022 kibnal_abort_txs(kib_conn_t *conn, struct list_head *txs)
2023 {
2024         LIST_HEAD           (zombies); 
2025         struct list_head    *tmp;
2026         struct list_head    *nxt;
2027         kib_tx_t            *tx;
2028
2029         spin_lock(&conn->ibc_lock);
2030
2031         list_for_each_safe (tmp, nxt, txs) {
2032                 tx = list_entry (tmp, kib_tx_t, tx_list);
2033
2034                 if (txs == &conn->ibc_active_txs) {
2035                         LASSERT (!tx->tx_queued);
2036                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2037                 } else {
2038                         LASSERT (tx->tx_queued);
2039                 }
2040                 
2041                 tx->tx_status = -ECONNABORTED;
2042                 tx->tx_queued = 0;
2043                 tx->tx_waiting = 0;
2044                 
2045                 if (tx->tx_sending == 0) {
2046                         list_del (&tx->tx_list);
2047                         list_add (&tx->tx_list, &zombies);
2048                 }
2049         }
2050
2051         spin_unlock(&conn->ibc_lock);
2052
2053         kibnal_txlist_done(&zombies, -ECONNABORTED);
2054 }
2055
2056 void
2057 kibnal_conn_disconnected(kib_conn_t *conn)
2058 {
2059         static IB_QP_ATTRIBUTES_MODIFY qpam = {.RequestState = QPStateError};
2060
2061         FSTATUS           frc;
2062
2063         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT_QP);
2064
2065         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
2066
2067         /* move QP to error state to make posted work items complete */
2068         frc = iba_modify_qp(conn->ibc_qp, &qpam, NULL);
2069         if (frc != FSUCCESS)
2070                 CERROR("can't move qp state to error: %d\n", frc);
2071
2072         /* Complete all tx descs not waiting for sends to complete.
2073          * NB we should be safe from RDMA now that the QP has changed state */
2074
2075         kibnal_abort_txs(conn, &conn->ibc_tx_queue);
2076         kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2077         kibnal_abort_txs(conn, &conn->ibc_tx_queue);
2078         kibnal_abort_txs(conn, &conn->ibc_active_txs);
2079
2080         kibnal_handle_early_rxs(conn);
2081 }
2082
2083 void
2084 kibnal_peer_connect_failed (kib_peer_t *peer, int type, int error)
2085 {
2086         LIST_HEAD        (zombies);
2087         unsigned long     flags;
2088
2089         LASSERT (error != 0);
2090         LASSERT (!in_interrupt());
2091
2092         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2093
2094         LASSERT (kibnal_peer_connecting(peer));
2095
2096         switch (type) {
2097         case IBNAL_CONN_ACTIVE:
2098                 LASSERT (peer->ibp_connecting > 0);
2099                 peer->ibp_connecting--;
2100                 break;
2101                 
2102         case IBNAL_CONN_PASSIVE:
2103                 LASSERT (peer->ibp_accepting > 0);
2104                 peer->ibp_accepting--;
2105                 break;
2106                 
2107         case IBNAL_CONN_WAITING:
2108                 /* Can't assert; I might be racing with a successful connection
2109                  * which clears passivewait */
2110                 peer->ibp_passivewait = 0;
2111                 break;
2112         default:
2113                 LBUG();
2114         }
2115
2116         if (kibnal_peer_connecting(peer) ||     /* another attempt underway */
2117             !list_empty(&peer->ibp_conns)) {    /* got connected */
2118                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2119                 return;
2120         }
2121
2122         /* Say when active connection can be re-attempted */
2123         peer->ibp_reconnect_interval *= 2;
2124         peer->ibp_reconnect_interval =
2125                 MAX(peer->ibp_reconnect_interval,
2126                     *kibnal_tunables.kib_min_reconnect_interval);
2127         peer->ibp_reconnect_interval =
2128                 MIN(peer->ibp_reconnect_interval,
2129                     *kibnal_tunables.kib_max_reconnect_interval);
2130         
2131         peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval * HZ;
2132
2133         /* Take peer's blocked transmits to complete with error */
2134         list_add(&zombies, &peer->ibp_tx_queue);
2135         list_del_init(&peer->ibp_tx_queue);
2136                 
2137         if (kibnal_peer_active(peer) &&
2138             peer->ibp_persistence == 0) {
2139                 /* failed connection attempt on non-persistent peer */
2140                 kibnal_unlink_peer_locked (peer);
2141         }
2142
2143         peer->ibp_error = error;
2144         
2145         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2146
2147         kibnal_peer_notify(peer);
2148
2149         if (list_empty (&zombies))
2150                 return;
2151         
2152         CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
2153                 libcfs_nid2str(peer->ibp_nid));
2154
2155         kibnal_txlist_done (&zombies, -EHOSTUNREACH);
2156 }
2157
2158 void
2159 kibnal_connreq_done (kib_conn_t *conn, int type, int status)
2160 {
2161         kib_peer_t       *peer = conn->ibc_peer;
2162         struct list_head  txs;
2163         kib_tx_t         *tx;
2164         unsigned long     flags;
2165
2166         LASSERT (!in_interrupt());
2167         LASSERT (type == IBNAL_CONN_ACTIVE || type == IBNAL_CONN_PASSIVE);
2168         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT_QP);
2169         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
2170         LASSERT (kibnal_peer_connecting(peer));
2171
2172         LIBCFS_FREE(conn->ibc_cvars, sizeof(*conn->ibc_cvars));
2173         conn->ibc_cvars = NULL;
2174
2175         if (status != 0) {
2176                 /* failed to establish connection */
2177                 kibnal_peer_connect_failed(conn->ibc_peer, type, status);
2178                 kibnal_conn_disconnected(conn);
2179                 kibnal_conn_decref(conn);       /* Lose CM's ref */
2180                 return;
2181         }
2182
2183         /* connection established */
2184         LASSERT(conn->ibc_state == IBNAL_CONN_CONNECTING);
2185
2186         conn->ibc_last_send = jiffies;
2187         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2188         kibnal_peer_alive(peer);
2189
2190         CDEBUG(D_NET, "Connection %s ESTABLISHED\n",
2191                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2192
2193         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2194
2195         peer->ibp_passivewait = 0;              /* not waiting (got conn now) */
2196         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2197         list_add_tail(&conn->ibc_list, &peer->ibp_conns);
2198         
2199         if (!kibnal_peer_active(peer)) {
2200                 /* peer has been deleted */
2201                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2202                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2203
2204                 kibnal_peer_connect_failed(conn->ibc_peer, type, -ECONNABORTED);
2205                 kibnal_conn_decref(conn);       /* lose CM's ref */
2206                 return;
2207         }
2208         
2209         switch (type) {
2210         case IBNAL_CONN_ACTIVE:
2211                 LASSERT (peer->ibp_connecting > 0);
2212                 peer->ibp_connecting--;
2213                 break;
2214
2215         case IBNAL_CONN_PASSIVE:
2216                 LASSERT (peer->ibp_accepting > 0);
2217                 peer->ibp_accepting--;
2218                 break;
2219         default:
2220                 LBUG();
2221         }
2222         
2223         peer->ibp_reconnect_interval = 0;       /* OK to reconnect at any time */
2224
2225         /* Nuke any dangling conns from a different peer instance... */
2226         kibnal_close_stale_conns_locked(peer, conn->ibc_incarnation);
2227
2228         /* grab txs blocking for a conn */
2229         list_add(&txs, &peer->ibp_tx_queue);
2230         list_del_init(&peer->ibp_tx_queue);
2231
2232         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2233         
2234         /* Schedule blocked txs */
2235         spin_lock (&conn->ibc_lock);
2236         while (!list_empty (&txs)) {
2237                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2238                 list_del (&tx->tx_list);
2239
2240                 kibnal_queue_tx_locked (tx, conn);
2241         }
2242         spin_unlock (&conn->ibc_lock);
2243         kibnal_check_sends (conn);
2244 }
2245
2246 void
2247 kibnal_reject (lnet_nid_t nid, IB_HANDLE cep, int why)
2248 {
2249         static CM_REJECT_INFO  msgs[3];
2250         CM_REJECT_INFO        *msg = &msgs[why];
2251         FSTATUS                frc;
2252
2253         LASSERT (why >= 0 && why < sizeof(msgs)/sizeof(msgs[0]));
2254
2255         /* If I wasn't so lazy, I'd initialise this only once; it's effectively
2256          * read-only... */
2257         msg->Reason         = RC_USER_REJ;
2258         msg->PrivateData[0] = (IBNAL_MSG_MAGIC) & 0xff;
2259         msg->PrivateData[1] = (IBNAL_MSG_MAGIC >> 8) & 0xff;
2260         msg->PrivateData[2] = (IBNAL_MSG_MAGIC >> 16) & 0xff;
2261         msg->PrivateData[3] = (IBNAL_MSG_MAGIC >> 24) & 0xff;
2262         msg->PrivateData[4] = (IBNAL_MSG_VERSION) & 0xff;
2263         msg->PrivateData[5] = (IBNAL_MSG_VERSION >> 8) & 0xff;
2264         msg->PrivateData[6] = why;
2265
2266         frc = iba_cm_reject(cep, msg);
2267         if (frc != FSUCCESS)
2268                 CERROR("Error %d rejecting %s\n", frc, libcfs_nid2str(nid));
2269 }
2270
2271 void
2272 kibnal_check_connreject(kib_conn_t *conn, int type, CM_REJECT_INFO *rej)
2273 {
2274         kib_peer_t    *peer = conn->ibc_peer;
2275         unsigned long  flags;
2276         int            magic;
2277         int            version;
2278         int            why;
2279
2280         LASSERT (type == IBNAL_CONN_ACTIVE ||
2281                  type == IBNAL_CONN_PASSIVE);
2282
2283         CDEBUG(D_NET, "%s connection with %s rejected: %d\n",
2284                (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2285                libcfs_nid2str(peer->ibp_nid), rej->Reason);
2286
2287         switch (rej->Reason) {
2288         case RC_STALE_CONN:
2289                 if (type == IBNAL_CONN_PASSIVE) {
2290                         CERROR("Connection to %s rejected (stale QP)\n",
2291                                libcfs_nid2str(peer->ibp_nid));
2292                 } else {
2293                         CWARN("Connection from %s rejected (stale QP): "
2294                               "retrying...\n", libcfs_nid2str(peer->ibp_nid));
2295
2296                         /* retry from scratch to allocate a new conn 
2297                          * which will use a different QP */
2298                         kibnal_schedule_active_connect(peer, peer->ibp_version);
2299                 }
2300
2301                 /* An FCM_DISCONNECTED callback is still outstanding: give it a
2302                  * ref since kibnal_connreq_done() drops the CM's ref on conn
2303                  * on failure */
2304                 kibnal_conn_addref(conn);
2305                 break;
2306
2307         case RC_USER_REJ:
2308                 magic   = (rej->PrivateData[0]) |
2309                           (rej->PrivateData[1] << 8) |
2310                           (rej->PrivateData[2] << 16) |
2311                           (rej->PrivateData[3] << 24);
2312                 version = (rej->PrivateData[4]) |
2313                           (rej->PrivateData[5] << 8);
2314                 why     = (rej->PrivateData[6]);
2315
2316                 /* retry with old proto version */
2317                 if (magic == IBNAL_MSG_MAGIC &&
2318                     version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
2319                     conn->ibc_version == IBNAL_MSG_VERSION &&
2320                     type != IBNAL_CONN_PASSIVE) {
2321                         /* retry with a new conn */
2322                         CWARN ("Connection to %s refused: "
2323                                "retrying with old protocol version 0x%x\n", 
2324                                libcfs_nid2str(peer->ibp_nid), version);
2325                         kibnal_schedule_active_connect(peer, version);
2326                         break;
2327                 }
2328
2329                 if (magic != IBNAL_MSG_MAGIC ||
2330                     version != IBNAL_MSG_VERSION) {
2331                         CERROR("%s connection with %s rejected "
2332                                "(magic/ver %08x/%d why %d): "
2333                                "incompatible protocol\n",
2334                                (type == IBNAL_CONN_ACTIVE) ?
2335                                "Active" : "Passive",
2336                                libcfs_nid2str(peer->ibp_nid),
2337                                magic, version, why);
2338                         break;
2339                 }
2340
2341                 if (type == IBNAL_CONN_ACTIVE && 
2342                     why == IBNAL_REJECT_CONN_RACE) {
2343                         /* lost connection race */
2344                         CWARN("Connection to %s rejected: "
2345                               "lost connection race\n",
2346                               libcfs_nid2str(peer->ibp_nid));
2347
2348                         write_lock_irqsave(&kibnal_data.kib_global_lock, 
2349                                            flags);
2350
2351                         if (list_empty(&peer->ibp_conns)) {
2352                                 peer->ibp_passivewait = 1;
2353                                 peer->ibp_passivewait_deadline =
2354                                         jiffies + 
2355                                         (*kibnal_tunables.kib_timeout * HZ);
2356                         }
2357                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2358                                                 flags);
2359                         break;
2360                 }
2361
2362                 CERROR("%s connection with %s rejected: %d\n",
2363                        (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2364                        libcfs_nid2str(peer->ibp_nid), why);
2365                 break;
2366
2367         default:
2368                 CERROR("%s connection with %s rejected: %d\n",
2369                        (type == IBNAL_CONN_ACTIVE) ? "Active" : "Passive",
2370                        libcfs_nid2str(peer->ibp_nid), rej->Reason);
2371         }
2372         
2373         kibnal_connreq_done(conn, type, -ECONNREFUSED);
2374 }
2375
2376 void
2377 kibnal_cm_disconnect_callback(kib_conn_t *conn, CM_CONN_INFO *info)
2378 {
2379         CDEBUG(D_NET, "%s: state %d, status 0x%x\n", 
2380                libcfs_nid2str(conn->ibc_peer->ibp_nid),
2381                conn->ibc_state, info->Status);
2382         
2383         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
2384
2385         switch (info->Status) {
2386         default:
2387                 LBUG();
2388                 break;
2389
2390         case FCM_DISCONNECT_REQUEST:
2391                 /* Schedule conn to iba_cm_disconnect() if it wasn't already */
2392                 kibnal_close_conn (conn, 0);
2393                 break;
2394
2395         case FCM_DISCONNECT_REPLY:              /* peer acks my disconnect req */
2396         case FCM_DISCONNECTED:                  /* end of TIME_WAIT */
2397                 CDEBUG(D_NET, "Connection %s disconnected.\n",
2398                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
2399                 kibnal_conn_decref(conn);       /* Lose CM's ref */
2400                 break;
2401         }
2402 }
2403
2404 void
2405 kibnal_cm_passive_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2406 {
2407         kib_conn_t       *conn = arg;
2408
2409         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2410
2411         /* Established Connection Notifier */
2412         switch (info->Status) {
2413         default:
2414                 CERROR("Unexpected status %d on Connection %s\n",
2415                        info->Status, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2416                 LBUG();
2417                 break;
2418
2419         case FCM_CONNECT_TIMEOUT:
2420                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ETIMEDOUT);
2421                 break;
2422                 
2423         case FCM_CONNECT_REJECT:
2424                 kibnal_check_connreject(conn, IBNAL_CONN_PASSIVE, 
2425                                         &info->Info.Reject);
2426                 break;
2427
2428         case FCM_CONNECT_ESTABLISHED:
2429                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, 0);
2430                 break;
2431
2432         case FCM_DISCONNECT_REQUEST:
2433         case FCM_DISCONNECT_REPLY:
2434         case FCM_DISCONNECTED:
2435                 kibnal_cm_disconnect_callback(conn, info);
2436                 break;
2437         }
2438 }
2439
2440 int
2441 kibnal_accept (kib_conn_t **connp, IB_HANDLE cep, kib_msg_t *msg, int nob)
2442 {
2443         lnet_nid_t     nid;
2444         kib_conn_t    *conn;
2445         kib_peer_t    *peer;
2446         kib_peer_t    *peer2;
2447         unsigned long  flags;
2448         int            rc;
2449
2450         rc = kibnal_unpack_msg(msg, 0, nob);
2451         if (rc != 0) {
2452                 /* SILENT! kibnal_unpack_msg() complains if required */
2453                 kibnal_reject(LNET_NID_ANY, cep, IBNAL_REJECT_FATAL);
2454                 return -EPROTO;
2455         }
2456
2457         nid = msg->ibm_srcnid;
2458
2459         if (msg->ibm_version != IBNAL_MSG_VERSION)
2460                 CWARN("Connection from %s: old protocol version 0x%x\n",
2461                       libcfs_nid2str(nid), msg->ibm_version);
2462
2463         if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
2464                 CERROR("Can't accept %s: bad request type %d (%d expected)\n",
2465                        libcfs_nid2str(nid), msg->ibm_type, IBNAL_MSG_CONNREQ);
2466                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2467                 return -EPROTO;
2468         }
2469         
2470         if (msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid) {
2471                 CERROR("Can't accept %s: bad dst NID %s (%s expected)\n",
2472                        libcfs_nid2str(nid), 
2473                        libcfs_nid2str(msg->ibm_dstnid), 
2474                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
2475                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2476                 return -EPROTO;
2477         }
2478         
2479         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE ||
2480             msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE ||
2481             msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2482                 CERROR("Reject %s: q %d sz %d frag %d, (%d %d %d expected)\n",
2483                        libcfs_nid2str(nid), 
2484                        msg->ibm_u.connparams.ibcp_queue_depth,
2485                        msg->ibm_u.connparams.ibcp_max_msg_size,
2486                        msg->ibm_u.connparams.ibcp_max_frags,
2487                        IBNAL_MSG_QUEUE_SIZE,
2488                        IBNAL_MSG_SIZE,
2489                        IBNAL_MAX_RDMA_FRAGS);
2490                 kibnal_reject(nid, cep, IBNAL_REJECT_FATAL);
2491                 return -EPROTO;
2492         }
2493
2494         conn = kibnal_create_conn(nid, msg->ibm_version);
2495         if (conn == NULL) {
2496                 kibnal_reject(nid, cep, IBNAL_REJECT_NO_RESOURCES);
2497                 return -ENOMEM;
2498         }
2499         
2500         /* assume 'nid' is a new peer */
2501         rc = kibnal_create_peer(&peer, nid);
2502         if (rc != 0) {
2503                 kibnal_conn_decref(conn);
2504                 kibnal_reject(nid, cep, IBNAL_REJECT_NO_RESOURCES);
2505                 return -ENOMEM;
2506         }
2507         
2508         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
2509
2510         peer2 = kibnal_find_peer_locked(nid);
2511         if (peer2 == NULL) {
2512                 /* peer table takes my ref on peer */
2513                 list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(nid));
2514                 LASSERT (peer->ibp_connecting == 0);
2515         } else {
2516                 kibnal_peer_decref(peer);
2517                 peer = peer2;
2518
2519                 if (peer->ibp_connecting != 0 &&
2520                     peer->ibp_nid < kibnal_data.kib_ni->ni_nid) {
2521                         /* Resolve concurrent connection attempts in favour of
2522                          * the higher NID */
2523                         write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
2524                                                 flags);
2525                         kibnal_conn_decref(conn);
2526                         kibnal_reject(nid, cep, IBNAL_REJECT_CONN_RACE);
2527                         return -EALREADY;
2528                 }
2529         }
2530
2531         kibnal_peer_addref(peer); /* +1 ref for conn */
2532         peer->ibp_accepting++;
2533
2534         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2535         conn->ibc_peer = peer;
2536         conn->ibc_incarnation = msg->ibm_srcstamp;
2537         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2538         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2539         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2540                  <= IBNAL_RX_MSGS);
2541
2542         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2543
2544         *connp = conn;
2545         return 0;
2546 }
2547
2548 void
2549 kibnal_listen_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2550 {
2551
2552         CM_REQUEST_INFO  *req = &info->Info.Request;
2553         CM_REPLY_INFO    *rep;
2554         kib_conn_t       *conn;
2555         FSTATUS           frc;
2556         int               rc;
2557         
2558         LASSERT(arg == NULL); /* no conn yet for passive */
2559
2560         CDEBUG(D_NET, "%x\n", info->Status);
2561         
2562         if (info->Status == FCM_CONNECT_CANCEL) {
2563                 up(&kibnal_data.kib_listener_signal);
2564                 return;
2565         }
2566         
2567         LASSERT (info->Status == FCM_CONNECT_REQUEST);
2568
2569         rc = kibnal_accept(&conn, cep, (kib_msg_t *)req->PrivateData, 
2570                            CM_REQUEST_INFO_USER_LEN);
2571         if (rc != 0)                   /* kibnal_accept has rejected */
2572                 return;
2573
2574         conn->ibc_cvars->cv_path = req->PathInfo.Path;
2575         
2576         rc = kibnal_conn_rts(conn, 
2577                              req->CEPInfo.QPN, 
2578                              req->CEPInfo.OfferedInitiatorDepth,
2579                              req->CEPInfo.OfferedResponderResources,
2580                              req->CEPInfo.StartingPSN);
2581         if (rc != 0) {
2582                 kibnal_reject(conn->ibc_peer->ibp_nid, cep, 
2583                               IBNAL_REJECT_NO_RESOURCES);
2584                 kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ECONNABORTED);
2585                 return;
2586         }
2587
2588         memset(&conn->ibc_cvars->cv_cmci, 0, sizeof(conn->ibc_cvars->cv_cmci));
2589         rep = &conn->ibc_cvars->cv_cmci.Info.Reply;
2590
2591         rep->QPN                   = conn->ibc_cvars->cv_qpattrs.QPNumber;
2592         rep->QKey                  = conn->ibc_cvars->cv_qpattrs.Qkey;
2593         rep->StartingPSN           = conn->ibc_cvars->cv_qpattrs.RecvPSN;
2594         rep->EndToEndFlowControl   = conn->ibc_cvars->cv_qpattrs.FlowControl;
2595         rep->ArbInitiatorDepth     = conn->ibc_cvars->cv_qpattrs.InitiatorDepth;
2596         rep->ArbResponderResources = conn->ibc_cvars->cv_qpattrs.ResponderResources;
2597         rep->TargetAckDelay        = kibnal_data.kib_hca_attrs.LocalCaAckDelay;
2598         rep->FailoverAccepted      = IBNAL_FAILOVER_ACCEPTED;
2599         rep->RnRRetryCount         = req->CEPInfo.RnrRetryCount;
2600         
2601         CLASSERT (CM_REPLY_INFO_USER_LEN >=
2602                   offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
2603
2604         kibnal_pack_connmsg((kib_msg_t *)rep->PrivateData,
2605                             conn->ibc_version,
2606                             CM_REPLY_INFO_USER_LEN,
2607                             IBNAL_MSG_CONNACK,
2608                             conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
2609
2610         LASSERT (conn->ibc_cep == NULL);
2611         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2612
2613         frc = iba_cm_accept(cep, 
2614                             &conn->ibc_cvars->cv_cmci,
2615                             NULL,
2616                             kibnal_cm_passive_callback, conn, 
2617                             &conn->ibc_cep);
2618
2619         if (frc == FSUCCESS || frc == FPENDING)
2620                 return;
2621         
2622         CERROR("iba_cm_accept(%s) failed: %d\n", 
2623                libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2624         kibnal_connreq_done(conn, IBNAL_CONN_PASSIVE, -ECONNABORTED);
2625 }
2626
2627 void
2628 kibnal_check_connreply(kib_conn_t *conn, CM_REPLY_INFO *rep)
2629 {
2630         kib_msg_t   *msg = (kib_msg_t *)rep->PrivateData;
2631         lnet_nid_t   nid = conn->ibc_peer->ibp_nid;
2632         FSTATUS      frc;
2633         int          rc;
2634
2635         rc = kibnal_unpack_msg(msg, conn->ibc_version, CM_REPLY_INFO_USER_LEN);
2636         if (rc != 0) {
2637                 CERROR ("Error %d unpacking connack from %s\n",
2638                         rc, libcfs_nid2str(nid));
2639                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2640                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2641                 return;
2642         }
2643                         
2644         if (msg->ibm_type != IBNAL_MSG_CONNACK) {
2645                 CERROR("Bad connack request type %d (%d expected) from %s\n",
2646                        msg->ibm_type, IBNAL_MSG_CONNREQ,
2647                        libcfs_nid2str(msg->ibm_srcnid));
2648                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2649                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2650                 return;
2651         }
2652
2653         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
2654             msg->ibm_dstnid != kibnal_data.kib_ni->ni_nid ||
2655             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
2656                 CERROR("Stale connack from %s(%s): %s(%s), "LPX64"("LPX64")\n",
2657                        libcfs_nid2str(msg->ibm_srcnid), 
2658                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
2659                        libcfs_nid2str(msg->ibm_dstnid),
2660                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid),
2661                        msg->ibm_dststamp, kibnal_data.kib_incarnation);
2662                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2663                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ESTALE);
2664                 return;
2665         }
2666         
2667         if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE ||
2668             msg->ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE ||
2669             msg->ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2670                 CERROR("Reject %s: q %d sz %d frag %d, (%d %d %d expected)\n",
2671                        libcfs_nid2str(msg->ibm_srcnid), 
2672                        msg->ibm_u.connparams.ibcp_queue_depth,
2673                        msg->ibm_u.connparams.ibcp_max_msg_size,
2674                        msg->ibm_u.connparams.ibcp_max_frags,
2675                        IBNAL_MSG_QUEUE_SIZE,
2676                        IBNAL_MSG_SIZE,
2677                        IBNAL_MAX_RDMA_FRAGS);
2678                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_FATAL);
2679                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EPROTO);
2680                 return;
2681         }
2682                         
2683         CDEBUG(D_NET, "Connection %s REP_RECEIVED.\n",
2684                libcfs_nid2str(conn->ibc_peer->ibp_nid));
2685
2686         conn->ibc_incarnation = msg->ibm_srcstamp;
2687         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2688         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2689         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2690                  <= IBNAL_RX_MSGS);
2691
2692         rc = kibnal_conn_rts(conn, 
2693                              rep->QPN,
2694                              rep->ArbInitiatorDepth,
2695                              rep->ArbResponderResources,
2696                              rep->StartingPSN);
2697         if (rc != 0) {
2698                 kibnal_reject(nid, conn->ibc_cep, IBNAL_REJECT_NO_RESOURCES);
2699                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EIO);
2700                 return;
2701         }
2702
2703         memset(&conn->ibc_cvars->cv_cmci, 0, sizeof(conn->ibc_cvars->cv_cmci));
2704         
2705         frc = iba_cm_accept(conn->ibc_cep, 
2706                             &conn->ibc_cvars->cv_cmci, 
2707                             NULL, NULL, NULL, NULL);
2708
2709         if (frc == FCM_CONNECT_ESTABLISHED) {
2710                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, 0);
2711                 return;
2712         }
2713         
2714         CERROR("Connection %s CMAccept failed: %d\n",
2715                libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2716         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ECONNABORTED);
2717 }
2718
2719 void
2720 kibnal_cm_active_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2721 {
2722         kib_conn_t       *conn = arg;
2723
2724         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2725
2726         switch (info->Status) {
2727         default:
2728                 CERROR("unknown status %d on Connection %s\n", 
2729                        info->Status, libcfs_nid2str(conn->ibc_peer->ibp_nid));
2730                 LBUG();
2731                 break;
2732
2733         case FCM_CONNECT_TIMEOUT:
2734                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ETIMEDOUT);
2735                 break;
2736                 
2737         case FCM_CONNECT_REJECT:
2738                 kibnal_check_connreject(conn, IBNAL_CONN_ACTIVE,
2739                                         &info->Info.Reject);
2740                 break;
2741
2742         case FCM_CONNECT_REPLY:
2743                 kibnal_check_connreply(conn, &info->Info.Reply);
2744                 break;
2745
2746         case FCM_DISCONNECT_REQUEST:
2747         case FCM_DISCONNECT_REPLY:
2748         case FCM_DISCONNECTED:
2749                 kibnal_cm_disconnect_callback(conn, info);
2750                 break;
2751         }
2752 }
2753
2754 void
2755 dump_path_records(PATH_RESULTS *results)
2756 {
2757         IB_PATH_RECORD *path;
2758         int i;
2759
2760         for (i = 0; i < results->NumPathRecords; i++) {
2761                 path = &results->PathRecords[i];
2762                 CDEBUG(D_NET, "%d: sgid "LPX64":"LPX64" dgid "
2763                        LPX64":"LPX64" pkey %x\n",
2764                        i,
2765                        path->SGID.Type.Global.SubnetPrefix,
2766                        path->SGID.Type.Global.InterfaceID,
2767                        path->DGID.Type.Global.SubnetPrefix,
2768                        path->DGID.Type.Global.InterfaceID,
2769                        path->P_Key);
2770         }
2771 }
2772
2773 void
2774 kibnal_pathreq_callback (void *arg, QUERY *qry, 
2775                          QUERY_RESULT_VALUES *qrslt)
2776 {
2777         IB_CA_ATTRIBUTES  *ca_attr = &kibnal_data.kib_hca_attrs;
2778         kib_conn_t        *conn = arg;
2779         CM_REQUEST_INFO   *req = &conn->ibc_cvars->cv_cmci.Info.Request;
2780         PATH_RESULTS      *path = (PATH_RESULTS *)qrslt->QueryResult;
2781         FSTATUS            frc;
2782         
2783         if (qrslt->Status != FSUCCESS || 
2784             qrslt->ResultDataSize < sizeof(*path)) {
2785                 CDEBUG (D_NETERROR, "pathreq %s failed: status %d data size %d\n", 
2786                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
2787                         qrslt->Status, qrslt->ResultDataSize);
2788                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2789                 return;
2790         }
2791
2792         if (path->NumPathRecords < 1) {
2793                 CDEBUG (D_NETERROR, "pathreq %s failed: no path records\n",
2794                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
2795                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2796                 return;
2797         }
2798
2799         //dump_path_records(path);
2800         conn->ibc_cvars->cv_path = path->PathRecords[0];
2801
2802         LASSERT (conn->ibc_cep == NULL);
2803
2804         conn->ibc_cep = kibnal_create_cep(conn->ibc_peer->ibp_nid);
2805         if (conn->ibc_cep == NULL) {
2806                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -ENOMEM);
2807                 return;
2808         }
2809
2810         memset(req, 0, sizeof(*req));
2811         req->SID                               = conn->ibc_cvars->cv_svcrec.RID.ServiceID;
2812         req->CEPInfo.CaGUID                    = kibnal_data.kib_hca_guids[kibnal_data.kib_hca_idx];
2813         req->CEPInfo.EndToEndFlowControl       = IBNAL_EE_FLOW;
2814         req->CEPInfo.PortGUID                  = conn->ibc_cvars->cv_path.SGID.Type.Global.InterfaceID;
2815         req->CEPInfo.RetryCount                = IBNAL_RETRY;
2816         req->CEPInfo.RnrRetryCount             = IBNAL_RNR_RETRY;
2817         req->CEPInfo.AckTimeout                = IBNAL_ACK_TIMEOUT;
2818         req->CEPInfo.StartingPSN               = IBNAL_STARTING_PSN;
2819         req->CEPInfo.QPN                       = conn->ibc_cvars->cv_qpattrs.QPNumber;
2820         req->CEPInfo.QKey                      = conn->ibc_cvars->cv_qpattrs.Qkey;
2821         req->CEPInfo.OfferedResponderResources = ca_attr->MaxQPResponderResources;
2822         req->CEPInfo.OfferedInitiatorDepth     = ca_attr->MaxQPInitiatorDepth;
2823         req->PathInfo.bSubnetLocal             = IBNAL_LOCAL_SUB;
2824         req->PathInfo.Path                     = conn->ibc_cvars->cv_path;
2825
2826         CLASSERT (CM_REQUEST_INFO_USER_LEN >=
2827                   offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
2828
2829         kibnal_pack_connmsg((kib_msg_t *)req->PrivateData, 
2830                             conn->ibc_version,
2831                             CM_REQUEST_INFO_USER_LEN,
2832                             IBNAL_MSG_CONNREQ, 
2833                             conn->ibc_peer->ibp_nid, 0);
2834
2835         if (the_lnet.ln_testprotocompat != 0) {
2836                 /* single-shot proto test */
2837                 LNET_LOCK();
2838                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
2839                         ((kib_msg_t *)req->PrivateData)->ibm_version++;
2840                         the_lnet.ln_testprotocompat &= ~1;
2841                 }
2842                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
2843                         ((kib_msg_t *)req->PrivateData)->ibm_magic =
2844                                 LNET_PROTO_MAGIC;
2845                         the_lnet.ln_testprotocompat &= ~2;
2846                 }
2847                 LNET_UNLOCK();
2848         }
2849
2850         /* Flag I'm getting involved with the CM... */
2851         kibnal_set_conn_state(conn, IBNAL_CONN_CONNECTING);
2852
2853         /* cm callback gets my conn ref */
2854         frc = iba_cm_connect(conn->ibc_cep, req, 
2855                              kibnal_cm_active_callback, conn);
2856         if (frc == FPENDING || frc == FSUCCESS)
2857                 return;
2858         
2859         CERROR ("Connect %s failed: %d\n", 
2860                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2861         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2862 }
2863
2864 void
2865 kibnal_dump_service_records(SERVICE_RECORD_RESULTS *results)
2866 {
2867         IB_SERVICE_RECORD *svc;
2868         int i;
2869
2870         for (i = 0; i < results->NumServiceRecords; i++) {
2871                 svc = &results->ServiceRecords[i];
2872                 CDEBUG(D_NET, "%d: sid "LPX64" gid "LPX64":"LPX64" pkey %x\n",
2873                        i,
2874                        svc->RID.ServiceID,
2875                        svc->RID.ServiceGID.Type.Global.SubnetPrefix,
2876                        svc->RID.ServiceGID.Type.Global.InterfaceID,
2877                        svc->RID.ServiceP_Key);
2878         }
2879 }
2880
2881 void
2882 kibnal_service_get_callback (void *arg, QUERY *qry, 
2883                              QUERY_RESULT_VALUES *qrslt)
2884 {
2885         kib_conn_t              *conn = arg;
2886         SERVICE_RECORD_RESULTS  *svc;
2887         FSTATUS                  frc;
2888
2889         if (qrslt->Status != FSUCCESS || 
2890             qrslt->ResultDataSize < sizeof(*svc)) {
2891                 CDEBUG (D_NETERROR, "Lookup %s failed: status %d data size %d\n", 
2892                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
2893                         qrslt->Status, qrslt->ResultDataSize);
2894                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2895                 return;
2896         }
2897
2898         svc = (SERVICE_RECORD_RESULTS *)qrslt->QueryResult;
2899         if (svc->NumServiceRecords < 1) {
2900                 CDEBUG (D_NETERROR, "lookup %s failed: no service records\n",
2901                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
2902                 kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2903                 return;
2904         }
2905
2906         //kibnal_dump_service_records(svc);
2907         conn->ibc_cvars->cv_svcrec = svc->ServiceRecords[0];
2908
2909         qry = &conn->ibc_cvars->cv_query;
2910         memset(qry, 0, sizeof(*qry));
2911
2912         qry->OutputType = OutputTypePathRecord;
2913         qry->InputType = InputTypePortGuidPair;
2914
2915         qry->InputValue.PortGuidPair.SourcePortGuid = 
2916                 kibnal_data.kib_port_guid;
2917         qry->InputValue.PortGuidPair.DestPortGuid  = 
2918                 conn->ibc_cvars->cv_svcrec.RID.ServiceGID.Type.Global.InterfaceID;
2919
2920         /* kibnal_pathreq_callback gets my conn ref */
2921         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd,
2922                                             kibnal_data.kib_port_guid,
2923                                             qry, 
2924                                             kibnal_pathreq_callback,
2925                                             &kibnal_data.kib_sdretry,
2926                                             conn);
2927         if (frc == FPENDING)
2928                 return;
2929
2930         CERROR ("pathreq %s failed: %d\n", 
2931                 libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
2932         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2933 }
2934
2935 void
2936 kibnal_connect_peer (kib_peer_t *peer)
2937 {
2938         QUERY                     *qry;
2939         FSTATUS                    frc;
2940         kib_conn_t                *conn;
2941
2942         LASSERT (peer->ibp_connecting != 0);
2943
2944         conn = kibnal_create_conn(peer->ibp_nid, peer->ibp_version);
2945         if (conn == NULL) {
2946                 CERROR ("Can't allocate conn\n");
2947                 kibnal_peer_connect_failed(peer, IBNAL_CONN_ACTIVE, -ENOMEM);
2948                 return;
2949         }
2950
2951         conn->ibc_peer = peer;
2952         kibnal_peer_addref(peer);
2953
2954         qry = &conn->ibc_cvars->cv_query;
2955         memset(qry, 0, sizeof(*qry));
2956
2957         qry->OutputType = OutputTypeServiceRecord;
2958         qry->InputType = InputTypeServiceRecord;
2959
2960         qry->InputValue.ServiceRecordValue.ComponentMask = 
2961                 KIBNAL_SERVICE_KEY_MASK;
2962         kibnal_set_service_keys(
2963                 &qry->InputValue.ServiceRecordValue.ServiceRecord, 
2964                 peer->ibp_nid);
2965
2966         /* kibnal_service_get_callback gets my conn ref */
2967         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd,
2968                                             kibnal_data.kib_port_guid,
2969                                             qry,
2970                                             kibnal_service_get_callback,
2971                                             &kibnal_data.kib_sdretry, 
2972                                             conn);
2973         if (frc == FPENDING)
2974                 return;
2975
2976         CERROR("Lookup %s failed: %d\n", libcfs_nid2str(peer->ibp_nid), frc);
2977         kibnal_connreq_done(conn, IBNAL_CONN_ACTIVE, -EHOSTUNREACH);
2978 }
2979
2980 int
2981 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
2982 {
2983         kib_tx_t          *tx;
2984         struct list_head  *ttmp;
2985         int                timed_out = 0;
2986
2987         spin_lock(&conn->ibc_lock);
2988
2989         list_for_each (ttmp, txs) {
2990                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2991
2992                 if (txs == &conn->ibc_active_txs) {
2993                         LASSERT (!tx->tx_queued);
2994                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
2995                 } else {
2996                         LASSERT (tx->tx_queued);
2997                 }
2998
2999                 if (time_after_eq (jiffies, tx->tx_deadline)) {
3000                         timed_out = 1;
3001                         break;
3002                 }
3003         }
3004
3005         spin_unlock(&conn->ibc_lock);
3006         return timed_out;
3007 }
3008
3009 int
3010 kibnal_conn_timed_out (kib_conn_t *conn)
3011 {
3012         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
3013                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
3014                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
3015                 kibnal_check_txs(conn, &conn->ibc_active_txs);
3016 }
3017
3018 void
3019 kibnal_check_peers (int idx)
3020 {
3021         rwlock_t          *rwlock = &kibnal_data.kib_global_lock;
3022         struct list_head  *peers = &kibnal_data.kib_peers[idx];
3023         struct list_head  *ptmp;
3024         kib_peer_t        *peer;
3025         kib_conn_t        *conn;
3026         struct list_head  *ctmp;
3027         unsigned long      flags;
3028
3029  again:
3030         /* NB. We expect to have a look at all the peers and not find any
3031          * rdmas to time out, so we just use a shared lock while we
3032          * take a look... */
3033         read_lock_irqsave(rwlock, flags);
3034
3035         list_for_each (ptmp, peers) {
3036                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
3037
3038                 if (peer->ibp_passivewait) {
3039                         LASSERT (list_empty(&peer->ibp_conns));
3040                         
3041                         if (!time_after_eq(jiffies, 
3042                                            peer->ibp_passivewait_deadline))
3043                                 continue;
3044                         
3045                         kibnal_peer_addref(peer); /* ++ ref for me... */
3046                         read_unlock_irqrestore(rwlock, flags);
3047
3048                         kibnal_peer_connect_failed(peer, IBNAL_CONN_WAITING,
3049                                                    -ETIMEDOUT);
3050                         kibnal_peer_decref(peer); /* ...until here */
3051                         
3052                         /* start again now I've dropped the lock */
3053                         goto again;
3054                 }
3055
3056                 list_for_each (ctmp, &peer->ibp_conns) {
3057                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
3058
3059                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
3060
3061                         /* In case we have enough credits to return via a
3062                          * NOOP, but there were no non-blocking tx descs
3063                          * free to do it last time... */
3064                         kibnal_check_sends(conn);
3065
3066                         if (!kibnal_conn_timed_out(conn))
3067                                 continue;
3068
3069                         /* Handle timeout by closing the whole connection.  We
3070                          * can only be sure RDMA activity has ceased once the
3071                          * QP has been modified. */
3072                         
3073                         kibnal_conn_addref(conn); /* 1 ref for me... */
3074
3075                         read_unlock_irqrestore(rwlock, flags);
3076
3077                         CERROR("Timed out RDMA with %s\n",
3078                                libcfs_nid2str(peer->ibp_nid));
3079
3080                         kibnal_close_conn (conn, -ETIMEDOUT);
3081                         kibnal_conn_decref(conn); /* ...until here */
3082
3083                         /* start again now I've dropped the lock */
3084                         goto again;
3085                 }
3086         }
3087
3088         read_unlock_irqrestore(rwlock, flags);
3089 }
3090
3091 void
3092 kibnal_disconnect_conn (kib_conn_t *conn)
3093 {
3094         FSTATUS       frc;
3095
3096         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECTING);
3097
3098         kibnal_conn_disconnected(conn);
3099                 
3100         frc = iba_cm_disconnect(conn->ibc_cep, NULL, NULL);
3101         switch (frc) {
3102         case FSUCCESS:
3103                 break;
3104                 
3105         case FINSUFFICIENT_RESOURCES:
3106                 CERROR("ENOMEM disconnecting %s\n",
3107                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3108                 /* This might cause the module to become unloadable since the
3109                  * FCM_DISCONNECTED callback is still outstanding */
3110                 break;
3111                 
3112         default:
3113                 CERROR("Unexpected error disconnecting %s: %d\n",
3114                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
3115                 LBUG();
3116         }
3117
3118         kibnal_peer_notify(conn->ibc_peer);
3119 }
3120
3121 int
3122 kibnal_connd (void *arg)
3123 {
3124         wait_queue_t       wait;
3125         unsigned long      flags;
3126         kib_conn_t        *conn;
3127         kib_peer_t        *peer;
3128         int                timeout;
3129         int                i;
3130         int                did_something;
3131         int                peer_index = 0;
3132         unsigned long      deadline = jiffies;
3133         
3134         cfs_daemonize ("kibnal_connd");
3135         cfs_block_allsigs ();
3136
3137         init_waitqueue_entry (&wait, current);
3138
3139         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3140
3141         while (!kibnal_data.kib_shutdown) {
3142                 did_something = 0;
3143
3144                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
3145                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
3146                                            kib_conn_t, ibc_list);
3147                         list_del (&conn->ibc_list);
3148                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3149                         did_something = 1;
3150
3151                         kibnal_destroy_conn(conn);
3152
3153                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3154                 }
3155
3156                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3157                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3158                                            kib_conn_t, ibc_list);
3159                         list_del (&conn->ibc_list);
3160                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3161                         did_something = 1;
3162
3163                         kibnal_disconnect_conn(conn);
3164                         kibnal_conn_decref(conn);
3165                         
3166                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3167                 }
3168
3169                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
3170                         peer = list_entry (kibnal_data.kib_connd_peers.next,
3171                                            kib_peer_t, ibp_connd_list);
3172                         
3173                         list_del_init (&peer->ibp_connd_list);
3174                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3175                         did_something = 1;
3176
3177                         kibnal_connect_peer (peer);
3178                         kibnal_peer_decref (peer);
3179
3180                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3181                 }
3182
3183                 /* careful with the jiffy wrap... */
3184                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
3185                         const int n = 4;
3186                         const int p = 1;
3187                         int       chunk = kibnal_data.kib_peer_hash_size;
3188                         
3189                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3190
3191                         /* Time to check for RDMA timeouts on a few more
3192                          * peers: I do checks every 'p' seconds on a
3193                          * proportion of the peer table and I need to check
3194                          * every connection 'n' times within a timeout
3195                          * interval, to ensure I detect a timeout on any
3196                          * connection within (n+1)/n times the timeout
3197                          * interval. */
3198
3199                         if (*kibnal_tunables.kib_timeout > n * p)
3200                                 chunk = (chunk * n * p) / 
3201                                         *kibnal_tunables.kib_timeout;
3202                         if (chunk == 0)
3203                                 chunk = 1;
3204
3205                         for (i = 0; i < chunk; i++) {
3206                                 kibnal_check_peers (peer_index);
3207                                 peer_index = (peer_index + 1) % 
3208                                              kibnal_data.kib_peer_hash_size;
3209                         }
3210
3211                         deadline += p * HZ;
3212                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3213                         did_something = 1;
3214                 }
3215
3216                 if (did_something)
3217                         continue;
3218
3219                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3220
3221                 set_current_state (TASK_INTERRUPTIBLE);
3222                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3223
3224                 if (!kibnal_data.kib_shutdown &&
3225                     list_empty (&kibnal_data.kib_connd_conns) &&
3226                     list_empty (&kibnal_data.kib_connd_peers))
3227                         schedule_timeout (timeout);
3228
3229                 set_current_state (TASK_RUNNING);
3230                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3231
3232                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3233         }
3234
3235         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3236
3237         kibnal_thread_fini ();
3238         return (0);
3239 }
3240
3241
3242 void 
3243 kibnal_hca_async_callback (void *hca_arg, IB_EVENT_RECORD *ev)
3244 {
3245         /* XXX flesh out.  this seems largely for async errors */
3246         CERROR("type: %d code: %u\n", ev->EventType, ev->EventCode);
3247 }
3248
3249 void
3250 kibnal_hca_callback (void *hca_arg, void *cq_arg)
3251 {
3252         unsigned long flags;
3253
3254         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3255         kibnal_data.kib_ready = 1;
3256         wake_up(&kibnal_data.kib_sched_waitq);
3257         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3258 }
3259
3260 int
3261 kibnal_scheduler(void *arg)
3262 {
3263         long               id = (long)arg;
3264         wait_queue_t       wait;
3265         char               name[16];
3266         FSTATUS            frc;
3267         FSTATUS            frc2;
3268         IB_WORK_COMPLETION wc;
3269         kib_rx_t          *rx;
3270         unsigned long      flags;
3271         __u64              rxseq = 0;
3272         int                busy_loops = 0;
3273
3274         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3275         cfs_daemonize(name);
3276         cfs_block_allsigs();
3277
3278         init_waitqueue_entry(&wait, current);
3279
3280         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3281
3282         while (!kibnal_data.kib_shutdown) {
3283                 if (busy_loops++ >= IBNAL_RESCHED) {
3284                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3285                                                flags);
3286
3287                         our_cond_resched();
3288                         busy_loops = 0;
3289                         
3290                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3291                 }
3292
3293                 if (kibnal_data.kib_ready &&
3294                     !kibnal_data.kib_checking_cq) {
3295                         /* take ownership of completion polling */
3296                         kibnal_data.kib_checking_cq = 1;
3297                         /* Assume I'll exhaust the CQ */
3298                         kibnal_data.kib_ready = 0;
3299                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3300                                                flags);
3301                         
3302                         frc = iba_poll_cq(kibnal_data.kib_cq, &wc);
3303                         if (frc == FNOT_DONE) {
3304                                 /* CQ empty */
3305                                 frc2 = iba_rearm_cq(kibnal_data.kib_cq,
3306                                                     CQEventSelNextWC);
3307                                 LASSERT (frc2 == FSUCCESS);
3308                         }
3309                         
3310                         if (frc == FSUCCESS &&
3311                             kibnal_wreqid2type(wc.WorkReqId) == IBNAL_WID_RX) {
3312                                 rx = (kib_rx_t *)kibnal_wreqid2ptr(wc.WorkReqId);
3313                                 
3314                                 /* Grab the RX sequence number NOW before
3315                                  * anyone else can get an RX completion */
3316                                 rxseq = rx->rx_conn->ibc_rxseq++;
3317                         }
3318                                 
3319                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3320                         /* give up ownership of completion polling */
3321                         kibnal_data.kib_checking_cq = 0;
3322
3323                         if (frc == FNOT_DONE)
3324                                 continue;
3325
3326                         LASSERT (frc == FSUCCESS);
3327                         /* Assume there's more: get another scheduler to check
3328                          * while I handle this completion... */
3329
3330                         kibnal_data.kib_ready = 1;
3331                         wake_up(&kibnal_data.kib_sched_waitq);
3332
3333                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3334                                                flags);
3335
3336                         switch (kibnal_wreqid2type(wc.WorkReqId)) {
3337                         case IBNAL_WID_RX:
3338                                 kibnal_rx_complete(&wc, rxseq);
3339                                 break;
3340                                 
3341                         case IBNAL_WID_TX:
3342                                 kibnal_tx_complete(&wc);
3343                                 break;
3344                                 
3345                         case IBNAL_WID_RDMA:
3346                                 /* We only get RDMA completion notification if
3347                                  * it fails.  So we just ignore them completely
3348                                  * because...
3349                                  *
3350                                  * 1) If an RDMA fails, all subsequent work
3351                                  * items, including the final SEND will fail
3352                                  * too, so I'm still guaranteed to notice that
3353                                  * this connection is hosed.
3354                                  *
3355                                  * 2) It's positively dangerous to look inside
3356                                  * the tx descriptor obtained from an RDMA work
3357                                  * item.  As soon as I drop the kib_sched_lock,
3358                                  * I give a scheduler on another CPU a chance
3359                                  * to get the final SEND completion, so the tx
3360                                  * descriptor can get freed as I inspect it. */
3361                                 CERROR ("RDMA failed: %d\n", wc.Status);
3362                                 break;
3363
3364                         default:
3365                                 LBUG();
3366                         }
3367                         
3368                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3369                         continue;
3370                 }
3371
3372                 /* Nothing to do; sleep... */
3373
3374                 set_current_state(TASK_INTERRUPTIBLE);
3375                 add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
3376                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3377                                        flags);
3378
3379                 schedule();
3380
3381                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3382                 set_current_state(TASK_RUNNING);
3383                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3384         }
3385
3386         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3387
3388         kibnal_thread_fini();
3389         return (0);
3390 }