Whamcloud - gitweb
i=liangzhen,i=maxim,b=16338:
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "viblnd.h"
26
27 void
28 kibnal_tx_done (kib_tx_t *tx)
29 {
30         lnet_msg_t *lntmsg[2];
31         int         rc = tx->tx_status;
32         int         i;
33
34         LASSERT (!in_interrupt());
35         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
36         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
37         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
38
39 #if IBNAL_USE_FMR
40         if (tx->tx_md.md_fmrcount == 0 ||
41             (rc != 0 && tx->tx_md.md_active)) {
42                 vv_return_t      vvrc;
43
44                 /* mapping must be active (it dropped fmrcount to 0) */
45                 LASSERT (tx->tx_md.md_active);
46
47                 vvrc = vv_unmap_fmr(kibnal_data.kib_hca,
48                                     1, &tx->tx_md.md_fmrhandle);
49                 LASSERT (vvrc == vv_return_ok);
50
51                 tx->tx_md.md_fmrcount = *kibnal_tunables.kib_fmr_remaps;
52         }
53         tx->tx_md.md_active = 0;
54 #endif
55
56         /* tx may have up to 2 lnet msgs to finalise */
57         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
58         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
59
60         if (tx->tx_conn != NULL) {
61                 kibnal_conn_decref(tx->tx_conn);
62                 tx->tx_conn = NULL;
63         }
64
65         tx->tx_nwrq = 0;
66         tx->tx_status = 0;
67
68         spin_lock(&kibnal_data.kib_tx_lock);
69
70         list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
71
72         spin_unlock(&kibnal_data.kib_tx_lock);
73
74         /* delay finalize until my descs have been freed */
75         for (i = 0; i < 2; i++) {
76                 if (lntmsg[i] == NULL)
77                         continue;
78
79                 lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
80         }
81 }
82
83 void
84 kibnal_txlist_done (struct list_head *txlist, int status)
85 {
86         kib_tx_t *tx;
87
88         while (!list_empty (txlist)) {
89                 tx = list_entry (txlist->next, kib_tx_t, tx_list);
90
91                 list_del (&tx->tx_list);
92                 /* complete now */
93                 tx->tx_waiting = 0;
94                 tx->tx_status = status;
95                 kibnal_tx_done (tx);
96         }
97 }
98
99 kib_tx_t *
100 kibnal_get_idle_tx (void)
101 {
102         kib_tx_t      *tx;
103
104         spin_lock(&kibnal_data.kib_tx_lock);
105
106         if (list_empty (&kibnal_data.kib_idle_txs)) {
107                 spin_unlock(&kibnal_data.kib_tx_lock);
108                 return NULL;
109         }
110
111         tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
112         list_del (&tx->tx_list);
113
114         /* Allocate a new completion cookie.  It might not be needed,
115          * but we've got a lock right now and we're unlikely to
116          * wrap... */
117         tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
118
119         spin_unlock(&kibnal_data.kib_tx_lock);
120
121         LASSERT (tx->tx_nwrq == 0);
122         LASSERT (!tx->tx_queued);
123         LASSERT (tx->tx_sending == 0);
124         LASSERT (!tx->tx_waiting);
125         LASSERT (tx->tx_status == 0);
126         LASSERT (tx->tx_conn == NULL);
127         LASSERT (tx->tx_lntmsg[0] == NULL);
128         LASSERT (tx->tx_lntmsg[1] == NULL);
129
130         return tx;
131 }
132
133 int
134 kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
135 {
136         kib_conn_t   *conn = rx->rx_conn;
137         int           rc = 0;
138         __u64         addr = (__u64)((unsigned long)((rx)->rx_msg));
139         vv_return_t   vvrc;
140
141         LASSERT (!in_interrupt());
142         /* old peers don't reserve rxs for RDMA replies */
143         LASSERT (!rsrvd_credit ||
144                  conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
145
146         rx->rx_gl = (vv_scatgat_t) {
147                 .v_address = KIBNAL_ADDR2SG(addr),
148                 .l_key     = rx->rx_lkey,
149                 .length    = IBNAL_MSG_SIZE,
150         };
151
152         rx->rx_wrq = (vv_wr_t) {
153                 .wr_id                   = kibnal_ptr2wreqid(rx, IBNAL_WID_RX),
154                 .completion_notification = 1,
155                 .scatgat_list            = &rx->rx_gl,
156                 .num_of_data_segments    = 1,
157                 .wr_type                 = vv_wr_receive,
158         };
159
160         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
161         LASSERT (rx->rx_nob >= 0);              /* not posted */
162
163         CDEBUG(D_NET, "posting rx [%d %x "LPX64"]\n",
164                rx->rx_wrq.scatgat_list->length,
165                rx->rx_wrq.scatgat_list->l_key,
166                KIBNAL_SG2ADDR(rx->rx_wrq.scatgat_list->v_address));
167
168         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED) {
169                 /* No more posts for this rx; so lose its ref */
170                 kibnal_conn_decref(conn);
171                 return 0;
172         }
173
174         rx->rx_nob = -1;                        /* flag posted */
175
176         spin_lock(&conn->ibc_lock);
177         /* Serialise vv_post_receive; it's not re-entrant on the same QP */
178         vvrc = vv_post_receive(kibnal_data.kib_hca,
179                                conn->ibc_qp, &rx->rx_wrq);
180
181         if (vvrc == vv_return_ok) {
182                 if (credit)
183                         conn->ibc_outstanding_credits++;
184                 if (rsrvd_credit)
185                         conn->ibc_reserved_credits++;
186
187                 spin_unlock(&conn->ibc_lock);
188
189                 if (credit || rsrvd_credit)
190                         kibnal_check_sends(conn);
191
192                 return 0;
193         }
194
195         spin_unlock(&conn->ibc_lock);
196
197         CERROR ("post rx -> %s failed %d\n",
198                 libcfs_nid2str(conn->ibc_peer->ibp_nid), vvrc);
199         rc = -EIO;
200         kibnal_close_conn(conn, rc);
201         /* No more posts for this rx; so lose its ref */
202         kibnal_conn_decref(conn);
203         return rc;
204 }
205
206 int
207 kibnal_post_receives (kib_conn_t *conn)
208 {
209         int    i;
210         int    rc;
211
212         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
213         LASSERT (conn->ibc_comms_error == 0);
214
215         for (i = 0; i < IBNAL_RX_MSGS; i++) {
216                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
217                  * fails (i.e. actual failure or we're disconnecting) */
218                 kibnal_conn_addref(conn);
219                 rc = kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
220                 if (rc != 0)
221                         return rc;
222         }
223
224         return 0;
225 }
226
227 kib_tx_t *
228 kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
229 {
230         struct list_head   *tmp;
231
232         list_for_each(tmp, &conn->ibc_active_txs) {
233                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
234
235                 LASSERT (!tx->tx_queued);
236                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
237
238                 if (tx->tx_cookie != cookie)
239                         continue;
240
241                 if (tx->tx_waiting &&
242                     tx->tx_msg->ibm_type == txtype)
243                         return tx;
244
245                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
246                       tx->tx_waiting ? "" : "NOT ",
247                       tx->tx_msg->ibm_type, txtype);
248         }
249         return NULL;
250 }
251
252 void
253 kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
254 {
255         kib_tx_t    *tx;
256         int          idle;
257
258         spin_lock(&conn->ibc_lock);
259
260         tx = kibnal_find_waiting_tx_locked(conn, txtype, cookie);
261         if (tx == NULL) {
262                 spin_unlock(&conn->ibc_lock);
263
264                 CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
265                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
266                 kibnal_close_conn (conn, -EPROTO);
267                 return;
268         }
269
270         if (tx->tx_status == 0) {               /* success so far */
271                 if (status < 0) {               /* failed? */
272                         tx->tx_status = status;
273                 } else if (txtype == IBNAL_MSG_GET_REQ) {
274                         lnet_set_reply_msg_len(kibnal_data.kib_ni,
275                                                tx->tx_lntmsg[1], status);
276                 }
277         }
278
279         tx->tx_waiting = 0;
280
281         idle = !tx->tx_queued && (tx->tx_sending == 0);
282         if (idle)
283                 list_del(&tx->tx_list);
284
285         spin_unlock(&conn->ibc_lock);
286
287         if (idle)
288                 kibnal_tx_done(tx);
289 }
290
291 void
292 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie)
293 {
294         kib_tx_t    *tx = kibnal_get_idle_tx();
295
296         if (tx == NULL) {
297                 CERROR("Can't get tx for completion %x for %s\n",
298                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
299                 return;
300         }
301
302         tx->tx_msg->ibm_u.completion.ibcm_status = status;
303         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
304         kibnal_init_tx_msg(tx, type, sizeof(kib_completion_msg_t));
305
306         kibnal_queue_tx(tx, conn);
307 }
308
309 void
310 kibnal_handle_rx (kib_rx_t *rx)
311 {
312         kib_msg_t    *msg = rx->rx_msg;
313         kib_conn_t   *conn = rx->rx_conn;
314         int           credits = msg->ibm_credits;
315         kib_tx_t     *tx;
316         int           rc = 0;
317         int           repost = 1;
318         int           rsrvd_credit = 0;
319         int           rc2;
320
321         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
322
323         CDEBUG (D_NET, "Received %x[%d] from %s\n",
324                 msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));
325
326         if (credits != 0) {
327                 /* Have I received credits that will let me send? */
328                 spin_lock(&conn->ibc_lock);
329                 conn->ibc_credits += credits;
330                 spin_unlock(&conn->ibc_lock);
331
332                 kibnal_check_sends(conn);
333         }
334
335         switch (msg->ibm_type) {
336         default:
337                 CERROR("Bad IBNAL message type %x from %s\n",
338                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
339                 rc = -EPROTO;
340                 break;
341
342         case IBNAL_MSG_NOOP:
343                 break;
344
345         case IBNAL_MSG_IMMEDIATE:
346                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
347                                 msg->ibm_srcnid, rx, 0);
348                 repost = rc < 0;                /* repost on error */
349                 break;
350
351         case IBNAL_MSG_PUT_REQ:
352                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
353                                 msg->ibm_srcnid, rx, 1);
354                 repost = rc < 0;                /* repost on error */
355                 break;
356
357         case IBNAL_MSG_PUT_NAK:
358                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
359
360                 CWARN ("PUT_NACK from %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid));
361                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ,
362                                          msg->ibm_u.completion.ibcm_status,
363                                          msg->ibm_u.completion.ibcm_cookie);
364                 break;
365
366         case IBNAL_MSG_PUT_ACK:
367                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
368
369                 spin_lock(&conn->ibc_lock);
370                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
371                                                    msg->ibm_u.putack.ibpam_src_cookie);
372                 if (tx != NULL)
373                         list_del(&tx->tx_list);
374                 spin_unlock(&conn->ibc_lock);
375
376                 if (tx == NULL) {
377                         CERROR("Unmatched PUT_ACK from %s\n",
378                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
379                         rc = -EPROTO;
380                         break;
381                 }
382
383                 LASSERT (tx->tx_waiting);
384                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
385                  * (a) I can overwrite tx_msg since my peer has received it!
386                  * (b) tx_waiting set tells tx_complete() it's not done. */
387
388                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
389
390                 rc2 = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE,
391                                        kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
392                                        &msg->ibm_u.putack.ibpam_rd,
393                                        msg->ibm_u.putack.ibpam_dst_cookie);
394                 if (rc2 < 0)
395                         CERROR("Can't setup rdma for PUT to %s: %d\n",
396                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
397
398                 spin_lock(&conn->ibc_lock);
399                 if (tx->tx_status == 0 && rc2 < 0)
400                         tx->tx_status = rc2;
401                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
402                 kibnal_queue_tx_locked(tx, conn);
403                 spin_unlock(&conn->ibc_lock);
404                 break;
405
406         case IBNAL_MSG_PUT_DONE:
407                 /* This buffer was pre-reserved by not returning the credit
408                  * when the PUT_REQ's buffer was reposted, so I just return it
409                  * now */
410                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
411                                          msg->ibm_u.completion.ibcm_status,
412                                          msg->ibm_u.completion.ibcm_cookie);
413                 break;
414
415         case IBNAL_MSG_GET_REQ:
416                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
417                                 msg->ibm_srcnid, rx, 1);
418                 repost = rc < 0;                /* repost on error */
419                 break;
420
421         case IBNAL_MSG_GET_DONE:
422                 rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
423
424                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
425                                          msg->ibm_u.completion.ibcm_status,
426                                          msg->ibm_u.completion.ibcm_cookie);
427                 break;
428         }
429
430         if (rc < 0)                             /* protocol error */
431                 kibnal_close_conn(conn, rc);
432
433         if (repost) {
434                 if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
435                         rsrvd_credit = 0;       /* peer isn't pre-reserving */
436
437                 kibnal_post_rx(rx, !rsrvd_credit, rsrvd_credit);
438         }
439 }
440
441 void
442 kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob, __u64 rxseq)
443 {
444         kib_msg_t    *msg = rx->rx_msg;
445         kib_conn_t   *conn = rx->rx_conn;
446         unsigned long flags;
447         int           rc;
448
449         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
450         LASSERT (rx->rx_nob < 0);               /* was posted */
451         rx->rx_nob = 0;                         /* isn't now */
452
453         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
454                 goto ignore;
455
456         if (vvrc != vv_comp_status_success) {
457                 CERROR("Rx from %s failed: %d\n",
458                        libcfs_nid2str(conn->ibc_peer->ibp_nid), vvrc);
459                 goto failed;
460         }
461
462         rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
463         if (rc != 0) {
464                 CERROR ("Error %d unpacking rx from %s\n",
465                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
466                 goto failed;
467         }
468
469         rx->rx_nob = nob;                       /* Can trust 'nob' now */
470
471         if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
472                                      msg->ibm_srcnid) ||
473             !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
474                                      msg->ibm_dstnid) ||
475             msg->ibm_srcstamp != conn->ibc_incarnation ||
476             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
477                 CERROR ("Stale rx from %s\n",
478                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
479                 goto failed;
480         }
481
482         if (msg->ibm_seq != rxseq) {
483                 CERROR ("Out-of-sequence rx from %s"
484                         ": got "LPD64" but expected "LPD64"\n",
485                         libcfs_nid2str(conn->ibc_peer->ibp_nid),
486                         msg->ibm_seq, rxseq);
487                 goto failed;
488         }
489
490         /* set time last known alive */
491         kibnal_peer_alive(conn->ibc_peer);
492
493         /* racing with connection establishment/teardown! */
494
495         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
496                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
497                 /* must check holding global lock to eliminate race */
498                 if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
499                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
500                         write_unlock_irqrestore(&kibnal_data.kib_global_lock,
501                                                 flags);
502                         return;
503                 }
504                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
505                                         flags);
506         }
507         kibnal_handle_rx(rx);
508         return;
509
510  failed:
511         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
512         kibnal_close_conn(conn, -EIO);
513  ignore:
514         /* Don't re-post rx & drop its ref on conn */
515         kibnal_conn_decref(conn);
516 }
517
518 struct page *
519 kibnal_kvaddr_to_page (unsigned long vaddr)
520 {
521         struct page *page;
522
523         if (vaddr >= VMALLOC_START &&
524             vaddr < VMALLOC_END) {
525                 page = vmalloc_to_page ((void *)vaddr);
526                 LASSERT (page != NULL);
527                 return page;
528         }
529 #ifdef CONFIG_HIGHMEM
530         if (vaddr >= PKMAP_BASE &&
531             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
532                 /* No highmem pages only used for bulk (kiov) I/O */
533                 CERROR("find page for address in highmem\n");
534                 LBUG();
535         }
536 #endif
537         page = virt_to_page (vaddr);
538         LASSERT (page != NULL);
539         return page;
540 }
541
542 #if !IBNAL_USE_FMR
543 int
544 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page,
545                      unsigned long page_offset, unsigned long len)
546 {
547         kib_rdma_frag_t *frag = &rd->rd_frags[rd->rd_nfrag];
548         vv_l_key_t       l_key;
549         vv_r_key_t       r_key;
550         __u64            addr;
551         __u64            frag_addr;
552         vv_mem_reg_h_t   mem_h;
553         vv_return_t      vvrc;
554
555         if (rd->rd_nfrag >= IBNAL_MAX_RDMA_FRAGS) {
556                 CERROR ("Too many RDMA fragments\n");
557                 return -EMSGSIZE;
558         }
559
560         /* Try to create an address that adaptor-tavor will munge into a valid
561          * network address, given how it maps all phys mem into 1 region */
562         addr = lnet_page2phys(page) + page_offset + PAGE_OFFSET;
563
564         /* NB this relies entirely on there being a single region for the whole
565          * of memory, since "high" memory will wrap in the (void *) cast! */
566         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
567                                     (void *)((unsigned long)addr),
568                                     len, &mem_h, &l_key, &r_key);
569         LASSERT (vvrc == vv_return_ok);
570
571         if (active) {
572                 if (rd->rd_nfrag == 0) {
573                         rd->rd_key = l_key;
574                 } else if (l_key != rd->rd_key) {
575                         CERROR ("> 1 key for single RDMA desc\n");
576                         return -EINVAL;
577                 }
578                 frag_addr = addr;
579         } else {
580                 if (rd->rd_nfrag == 0) {
581                         rd->rd_key = r_key;
582                 } else if (r_key != rd->rd_key) {
583                         CERROR ("> 1 key for single RDMA desc\n");
584                         return -EINVAL;
585                 }
586
587                 frag_addr = kibnal_addr2net(addr);
588         }
589
590         kibnal_rf_set(frag, frag_addr, len);
591
592         CDEBUG(D_NET,"map frag [%d][%d %x %08x%08x] "LPX64"\n",
593                rd->rd_nfrag, frag->rf_nob, rd->rd_key,
594                frag->rf_addr_hi, frag->rf_addr_lo, frag_addr);
595
596         rd->rd_nfrag++;
597         return 0;
598 }
599
600 int
601 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd,
602                     vv_access_con_bit_mask_t access,
603                     unsigned int niov, struct iovec *iov, int offset, int nob)
604 {
605         /* active if I'm sending */
606         int           active = ((access & vv_acc_r_mem_write) == 0);
607         int           fragnob;
608         int           rc;
609         unsigned long vaddr;
610         struct page  *page;
611         int           page_offset;
612
613         LASSERT (nob > 0);
614         LASSERT (niov > 0);
615         LASSERT ((rd != tx->tx_rd) == !active);
616
617         while (offset >= iov->iov_len) {
618                 offset -= iov->iov_len;
619                 niov--;
620                 iov++;
621                 LASSERT (niov > 0);
622         }
623
624         rd->rd_nfrag = 0;
625         do {
626                 LASSERT (niov > 0);
627
628                 vaddr = ((unsigned long)iov->iov_base) + offset;
629                 page_offset = vaddr & (PAGE_SIZE - 1);
630                 page = kibnal_kvaddr_to_page(vaddr);
631                 if (page == NULL) {
632                         CERROR ("Can't find page\n");
633                         return -EFAULT;
634                 }
635
636                 fragnob = min((int)(iov->iov_len - offset), nob);
637                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
638
639                 rc = kibnal_append_rdfrag(rd, active, page,
640                                           page_offset, fragnob);
641                 if (rc != 0)
642                         return rc;
643
644                 if (offset + fragnob < iov->iov_len) {
645                         offset += fragnob;
646                 } else {
647                         offset = 0;
648                         iov++;
649                         niov--;
650                 }
651                 nob -= fragnob;
652         } while (nob > 0);
653
654         return 0;
655 }
656
657 int
658 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
659                       vv_access_con_bit_mask_t access,
660                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
661 {
662         /* active if I'm sending */
663         int            active = ((access & vv_acc_r_mem_write) == 0);
664         int            fragnob;
665         int            rc;
666
667         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
668
669         LASSERT (nob > 0);
670         LASSERT (nkiov > 0);
671         LASSERT ((rd != tx->tx_rd) == !active);
672
673         while (offset >= kiov->kiov_len) {
674                 offset -= kiov->kiov_len;
675                 nkiov--;
676                 kiov++;
677                 LASSERT (nkiov > 0);
678         }
679
680         rd->rd_nfrag = 0;
681         do {
682                 LASSERT (nkiov > 0);
683                 fragnob = min((int)(kiov->kiov_len - offset), nob);
684
685                 rc = kibnal_append_rdfrag(rd, active, kiov->kiov_page,
686                                           kiov->kiov_offset + offset,
687                                           fragnob);
688                 if (rc != 0)
689                         return rc;
690
691                 offset = 0;
692                 kiov++;
693                 nkiov--;
694                 nob -= fragnob;
695         } while (nob > 0);
696
697         return 0;
698 }
699 #else
700 int
701 kibnal_map_tx (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
702                int npages, unsigned long page_offset, int nob)
703 {
704         vv_return_t   vvrc;
705         vv_fmr_map_t  map_props;
706
707         LASSERT ((rd != tx->tx_rd) == !active);
708         LASSERT (!tx->tx_md.md_active);
709         LASSERT (tx->tx_md.md_fmrcount > 0);
710         LASSERT (page_offset < PAGE_SIZE);
711         LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
712         LASSERT (npages <= LNET_MAX_IOV);
713
714         memset(&map_props, 0, sizeof(map_props));
715
716         map_props.start          = (void *)page_offset;
717         map_props.size           = nob;
718         map_props.page_array_len = npages;
719         map_props.page_array     = tx->tx_pages;
720
721         vvrc = vv_map_fmr(kibnal_data.kib_hca, tx->tx_md.md_fmrhandle,
722                           &map_props, &tx->tx_md.md_lkey, &tx->tx_md.md_rkey);
723         if (vvrc != vv_return_ok) {
724                 CERROR ("Can't map vaddr %p for %d in %d pages: %d\n",
725                         map_props.start, nob, npages, vvrc);
726                 return -EFAULT;
727         }
728
729         tx->tx_md.md_addr = (unsigned long)map_props.start;
730         tx->tx_md.md_active = 1;
731         tx->tx_md.md_fmrcount--;
732
733         rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
734         rd->rd_nob = nob;
735         rd->rd_addr = tx->tx_md.md_addr;
736
737         /* Compensate for adaptor-tavor's munging of gatherlist addresses */
738         if (active)
739                 rd->rd_addr += PAGE_OFFSET;
740
741         return 0;
742 }
743
744 int
745 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
746                      vv_access_con_bit_mask_t access,
747                      unsigned int niov, struct iovec *iov, int offset, int nob)
748 {
749         /* active if I'm sending */
750         int           active = ((access & vv_acc_r_mem_write) == 0);
751         int           resid;
752         int           fragnob;
753         struct page  *page;
754         int           npages;
755         unsigned long page_offset;
756         unsigned long vaddr;
757
758         LASSERT (nob > 0);
759         LASSERT (niov > 0);
760
761         while (offset >= iov->iov_len) {
762                 offset -= iov->iov_len;
763                 niov--;
764                 iov++;
765                 LASSERT (niov > 0);
766         }
767
768         if (nob > iov->iov_len - offset) {
769                 CERROR ("Can't map multiple vaddr fragments\n");
770                 return (-EMSGSIZE);
771         }
772
773         vaddr = ((unsigned long)iov->iov_base) + offset;
774
775         page_offset = vaddr & (PAGE_SIZE - 1);
776         resid = nob;
777         npages = 0;
778
779         do {
780                 LASSERT (npages < LNET_MAX_IOV);
781
782                 page = kibnal_kvaddr_to_page(vaddr);
783                 if (page == NULL) {
784                         CERROR("Can't find page for %lu\n", vaddr);
785                         return -EFAULT;
786                 }
787
788                 tx->tx_pages[npages++] = lnet_page2phys(page);
789
790                 fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
791                 vaddr += fragnob;
792                 resid -= fragnob;
793
794         } while (resid > 0);
795
796         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
797 }
798
799 int
800 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
801                       vv_access_con_bit_mask_t access,
802                       int nkiov, lnet_kiov_t *kiov, int offset, int nob)
803 {
804         /* active if I'm sending */
805         int            active = ((access & vv_acc_r_mem_write) == 0);
806         int            resid;
807         int            npages;
808         unsigned long  page_offset;
809
810         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
811
812         LASSERT (nob > 0);
813         LASSERT (nkiov > 0);
814         LASSERT (nkiov <= LNET_MAX_IOV);
815         LASSERT (!tx->tx_md.md_active);
816         LASSERT ((rd != tx->tx_rd) == !active);
817
818         while (offset >= kiov->kiov_len) {
819                 offset -= kiov->kiov_len;
820                 nkiov--;
821                 kiov++;
822                 LASSERT (nkiov > 0);
823         }
824
825         page_offset = kiov->kiov_offset + offset;
826
827         resid = offset + nob;
828         npages = 0;
829
830         do {
831                 LASSERT (npages < LNET_MAX_IOV);
832                 LASSERT (nkiov > 0);
833
834                 if ((npages > 0 && kiov->kiov_offset != 0) ||
835                     (resid > kiov->kiov_len &&
836                      (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
837                         /* Can't have gaps */
838                         CERROR ("Can't make payload contiguous in I/O VM:"
839                                 "page %d, offset %d, len %d \n",
840                                 npages, kiov->kiov_offset, kiov->kiov_len);
841
842                         return -EINVAL;
843                 }
844
845                 tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);
846                 resid -= kiov->kiov_len;
847                 kiov++;
848                 nkiov--;
849         } while (resid > 0);
850
851         return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
852 }
853 #endif
854
855 kib_conn_t *
856 kibnal_find_conn_locked (kib_peer_t *peer)
857 {
858         struct list_head *tmp;
859
860         /* just return the first connection */
861         list_for_each (tmp, &peer->ibp_conns) {
862                 return (list_entry(tmp, kib_conn_t, ibc_list));
863         }
864
865         return (NULL);
866 }
867
868 void
869 kibnal_check_sends (kib_conn_t *conn)
870 {
871         kib_tx_t       *tx;
872         vv_return_t     vvrc;
873         int             rc;
874         int             consume_cred;
875         int             done;
876
877         /* Don't send anything until after the connection is established */
878         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
879                 CDEBUG(D_NET, "%s too soon\n",
880                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
881                 return;
882         }
883
884         spin_lock(&conn->ibc_lock);
885
886         LASSERT (conn->ibc_nsends_posted <=
887                  *kibnal_tunables.kib_concurrent_sends);
888         LASSERT (conn->ibc_reserved_credits >= 0);
889
890         while (conn->ibc_reserved_credits > 0 &&
891                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
892                 LASSERT (conn->ibc_version !=
893                          IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
894                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
895                                 kib_tx_t, tx_list);
896                 list_del(&tx->tx_list);
897                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
898                 conn->ibc_reserved_credits--;
899         }
900
901         if (list_empty(&conn->ibc_tx_queue) &&
902             list_empty(&conn->ibc_tx_queue_nocred) &&
903             (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
904              kibnal_send_keepalive(conn))) {
905                 spin_unlock(&conn->ibc_lock);
906
907                 tx = kibnal_get_idle_tx();
908                 if (tx != NULL)
909                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
910
911                 spin_lock(&conn->ibc_lock);
912
913                 if (tx != NULL)
914                         kibnal_queue_tx_locked(tx, conn);
915         }
916
917         for (;;) {
918                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
919                         LASSERT (conn->ibc_version !=
920                                  IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
921                         tx = list_entry (conn->ibc_tx_queue_nocred.next,
922                                          kib_tx_t, tx_list);
923                         consume_cred = 0;
924                 } else if (!list_empty (&conn->ibc_tx_queue)) {
925                         tx = list_entry (conn->ibc_tx_queue.next,
926                                          kib_tx_t, tx_list);
927                         consume_cred = 1;
928                 } else {
929                         /* nothing waiting */
930                         break;
931                 }
932
933                 LASSERT (tx->tx_queued);
934                 /* We rely on this for QP sizing */
935                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
936
937                 LASSERT (conn->ibc_outstanding_credits >= 0);
938                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
939                 LASSERT (conn->ibc_credits >= 0);
940                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
941
942                 if (conn->ibc_nsends_posted ==
943                     *kibnal_tunables.kib_concurrent_sends) {
944                         /* We've got some tx completions outstanding... */
945                         CDEBUG(D_NET, "%s: posted enough\n",
946                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
947                         break;
948                 }
949
950                 if (consume_cred) {
951                         if (conn->ibc_credits == 0) {   /* no credits */
952                                 CDEBUG(D_NET, "%s: no credits\n",
953                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
954                                 break;
955                         }
956
957                         if (conn->ibc_credits == 1 &&   /* last credit reserved for */
958                             conn->ibc_outstanding_credits == 0) { /* giving back credits */
959                                 CDEBUG(D_NET, "%s: not using last credit\n",
960                                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
961                                 break;
962                         }
963                 }
964
965                 list_del (&tx->tx_list);
966                 tx->tx_queued = 0;
967
968                 /* NB don't drop ibc_lock before bumping tx_sending */
969
970                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
971                     (!list_empty(&conn->ibc_tx_queue) ||
972                      !list_empty(&conn->ibc_tx_queue_nocred) ||
973                      (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
974                       !kibnal_send_keepalive(conn)))) {
975                         /* redundant NOOP */
976                         spin_unlock(&conn->ibc_lock);
977                         kibnal_tx_done(tx);
978                         spin_lock(&conn->ibc_lock);
979                         CDEBUG(D_NET, "%s: redundant noop\n",
980                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
981                         continue;
982                 }
983
984                 kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
985                                 conn->ibc_outstanding_credits,
986                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
987                                 conn->ibc_txseq);
988
989                 conn->ibc_txseq++;
990                 conn->ibc_outstanding_credits = 0;
991                 conn->ibc_nsends_posted++;
992                 if (consume_cred)
993                         conn->ibc_credits--;
994
995                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
996                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
997                  * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
998                  * and then re-queued here.  It's (just) possible that
999                  * tx_sending is non-zero if we've not done the tx_complete() from
1000                  * the first send; hence the ++ rather than = below. */
1001                 tx->tx_sending++;
1002
1003                 list_add (&tx->tx_list, &conn->ibc_active_txs);
1004
1005                 /* Keep holding ibc_lock while posting sends on this
1006                  * connection; vv_post_send() isn't re-entrant on the same
1007                  * QP!! */
1008
1009                 LASSERT (tx->tx_nwrq > 0);
1010 #if 0
1011                 if (tx->tx_wrq[0].wr_type == vv_wr_rdma_write) 
1012                         CDEBUG(D_NET, "WORK[0]: RDMA gl %p for %d k %x -> "LPX64" k %x\n",
1013                                tx->tx_wrq[0].scatgat_list->v_address,
1014                                tx->tx_wrq[0].scatgat_list->length,
1015                                tx->tx_wrq[0].scatgat_list->l_key,
1016                                tx->tx_wrq[0].type.send.send_qp_type.rc_type.r_addr,
1017                                tx->tx_wrq[0].type.send.send_qp_type.rc_type.r_r_key);
1018                 else
1019                         CDEBUG(D_NET, "WORK[0]: %s gl %p for %d k %x\n",
1020                                tx->tx_wrq[0].wr_type == vv_wr_send ? "SEND" : "????",
1021                                tx->tx_wrq[0].scatgat_list->v_address,
1022                                tx->tx_wrq[0].scatgat_list->length,
1023                                tx->tx_wrq[0].scatgat_list->l_key);
1024
1025                 if (tx->tx_nwrq > 1) {
1026                         if (tx->tx_wrq[1].wr_type == vv_wr_rdma_write) 
1027                                 CDEBUG(D_NET, "WORK[1]: RDMA gl %p for %d k %x -> "LPX64" k %x\n",
1028                                        tx->tx_wrq[1].scatgat_list->v_address,
1029                                        tx->tx_wrq[1].scatgat_list->length,
1030                                        tx->tx_wrq[1].scatgat_list->l_key,
1031                                        tx->tx_wrq[1].type.send.send_qp_type.rc_type.r_addr,
1032                                        tx->tx_wrq[1].type.send.send_qp_type.rc_type.r_r_key);
1033                         else
1034                                 CDEBUG(D_NET, "WORK[1]: %s gl %p for %d k %x\n",
1035                                        tx->tx_wrq[1].wr_type == vv_wr_send ? "SEND" : "????",
1036                                        tx->tx_wrq[1].scatgat_list->v_address,
1037                                        tx->tx_wrq[1].scatgat_list->length,
1038                                        tx->tx_wrq[1].scatgat_list->l_key);
1039                 }
1040 #endif           
1041                 rc = -ECONNABORTED;
1042                 vvrc = vv_return_ok;
1043                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1044                         tx->tx_status = 0;
1045                         vvrc = vv_post_send_list(kibnal_data.kib_hca,
1046                                                  conn->ibc_qp,
1047                                                  tx->tx_nwrq,
1048                                                  tx->tx_wrq,
1049                                                  vv_operation_type_send_rc);
1050                         rc = (vvrc == vv_return_ok) ? 0 : -EIO;
1051                 }
1052
1053                 conn->ibc_last_send = jiffies;
1054
1055                 if (rc != 0) {
1056                         /* NB credits are transferred in the actual
1057                          * message, which can only be the last work item */
1058                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
1059                         if (consume_cred)
1060                                 conn->ibc_credits++;
1061                         conn->ibc_nsends_posted--;
1062
1063                         tx->tx_status = rc;
1064                         tx->tx_waiting = 0;
1065                         tx->tx_sending--;
1066
1067                         done = (tx->tx_sending == 0);
1068                         if (done)
1069                                 list_del (&tx->tx_list);
1070
1071                         spin_unlock(&conn->ibc_lock);
1072
1073                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1074                                 CERROR ("Error %d posting transmit to %s\n", 
1075                                         vvrc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1076                         else
1077                                 CDEBUG (D_NET, "Error %d posting transmit to %s\n",
1078                                         rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
1079
1080                         kibnal_close_conn (conn, rc);
1081
1082                         if (done)
1083                                 kibnal_tx_done (tx);
1084                         return;
1085                 }
1086         }
1087
1088         spin_unlock(&conn->ibc_lock);
1089 }
1090
1091 void
1092 kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
1093 {
1094         kib_conn_t   *conn = tx->tx_conn;
1095         int           failed = (vvrc != vv_comp_status_success);
1096         int           idle;
1097
1098         CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n", 
1099                tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);
1100
1101         LASSERT (tx->tx_sending > 0);
1102
1103         if (failed &&
1104             tx->tx_status == 0 &&
1105             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1106                 CDEBUG(D_NETERROR, "tx -> %s type %x cookie "LPX64
1107                        "sending %d waiting %d: failed %d\n", 
1108                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
1109                        tx->tx_msg->ibm_type, tx->tx_cookie,
1110                        tx->tx_sending, tx->tx_waiting, vvrc);
1111
1112         spin_lock(&conn->ibc_lock);
1113
1114         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1115          * gets to free it, which also drops its ref on 'conn'. */
1116
1117         tx->tx_sending--;
1118         conn->ibc_nsends_posted--;
1119
1120         if (failed) {
1121                 tx->tx_waiting = 0;
1122                 tx->tx_status = -EIO;
1123         }
1124
1125         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1126                !tx->tx_waiting &&               /* Not waiting for peer */
1127                !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
1128         if (idle)
1129                 list_del(&tx->tx_list);
1130
1131         kibnal_conn_addref(conn);               /* 1 ref for me.... */
1132
1133         spin_unlock(&conn->ibc_lock);
1134
1135         if (idle)
1136                 kibnal_tx_done (tx);
1137
1138         if (failed) {
1139                 kibnal_close_conn (conn, -EIO);
1140         } else {
1141                 kibnal_peer_alive(conn->ibc_peer);
1142                 kibnal_check_sends(conn);
1143         }
1144
1145         kibnal_conn_decref(conn);               /* ...until here */
1146 }
1147
1148 void
1149 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1150 {
1151         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];
1152         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];
1153         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1154         __u64         addr = (__u64)((unsigned long)((tx)->tx_msg));
1155
1156         LASSERT (tx->tx_nwrq >= 0 &&
1157                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
1158         LASSERT (nob <= IBNAL_MSG_SIZE);
1159
1160         kibnal_init_msg(tx->tx_msg, type, body_nob);
1161
1162         *gl = (vv_scatgat_t) {
1163                 .v_address = KIBNAL_ADDR2SG(addr),
1164                 .l_key     = tx->tx_lkey,
1165                 .length    = nob,
1166         };
1167
1168         memset(wrq, 0, sizeof(*wrq));
1169
1170         wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);
1171         wrq->wr_type = vv_wr_send;
1172         wrq->scatgat_list = gl;
1173         wrq->num_of_data_segments = 1;
1174         wrq->completion_notification = 1;
1175         wrq->type.send.solicited_event = 1;
1176         wrq->type.send.immidiate_data_indicator = 0;
1177         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1178
1179         tx->tx_nwrq++;
1180 }
1181
1182 int
1183 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
1184                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
1185 {
1186         kib_msg_t       *ibmsg = tx->tx_msg;
1187         kib_rdma_desc_t *srcrd = tx->tx_rd;
1188         vv_scatgat_t    *gl;
1189         vv_wr_t         *wrq;
1190         int              rc;
1191
1192 #if IBNAL_USE_FMR
1193         LASSERT (tx->tx_nwrq == 0);
1194
1195         gl = &tx->tx_gl[0];
1196         gl->length    = nob;
1197         gl->v_address = KIBNAL_ADDR2SG(srcrd->rd_addr);
1198         gl->l_key     = srcrd->rd_key;
1199
1200         wrq = &tx->tx_wrq[0];
1201
1202         wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1203         wrq->completion_notification = 0;
1204         wrq->scatgat_list = gl;
1205         wrq->num_of_data_segments = 1;
1206         wrq->wr_type = vv_wr_rdma_write;
1207         wrq->type.send.solicited_event = 0;
1208         wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1209         wrq->type.send.send_qp_type.rc_type.r_addr = dstrd->rd_addr;
1210         wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1211
1212         tx->tx_nwrq = 1;
1213         rc = nob;
1214 #else
1215         /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
1216         int              resid = nob;
1217         kib_rdma_frag_t *srcfrag;
1218         int              srcidx;
1219         kib_rdma_frag_t *dstfrag;
1220         int              dstidx;
1221         int              wrknob;
1222
1223         /* Called by scheduler */
1224         LASSERT (!in_interrupt());
1225
1226         LASSERT (type == IBNAL_MSG_GET_DONE ||
1227                  type == IBNAL_MSG_PUT_DONE);
1228
1229         srcidx = dstidx = 0;
1230         srcfrag = &srcrd->rd_frags[0];
1231         dstfrag = &dstrd->rd_frags[0];
1232         rc = resid;
1233
1234         while (resid > 0) {
1235                 if (srcidx >= srcrd->rd_nfrag) {
1236                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1237                         rc = -EPROTO;
1238                         break;
1239                 }
1240
1241                 if (dstidx == dstrd->rd_nfrag) {
1242                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1243                         rc = -EPROTO;
1244                         break;
1245                 }
1246
1247                 if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {
1248                         CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",
1249                                srcidx, srcrd->rd_nfrag,
1250                                dstidx, dstrd->rd_nfrag);
1251                         rc = -EMSGSIZE;
1252                         break;
1253                 }
1254
1255                 wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);
1256
1257                 gl = &tx->tx_gl[tx->tx_nwrq];
1258                 gl->v_address = KIBNAL_ADDR2SG(kibnal_rf_addr(srcfrag));
1259                 gl->length    = wrknob;
1260                 gl->l_key     = srcrd->rd_key;
1261
1262                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1263
1264                 wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
1265                 wrq->completion_notification = 0;
1266                 wrq->scatgat_list = gl;
1267                 wrq->num_of_data_segments = 1;
1268                 wrq->wr_type = vv_wr_rdma_write;
1269                 wrq->type.send.solicited_event = 0;
1270                 wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
1271                 wrq->type.send.send_qp_type.rc_type.r_addr = kibnal_rf_addr(dstfrag);
1272                 wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
1273
1274                 resid -= wrknob;
1275                 if (wrknob < srcfrag->rf_nob) {
1276                         kibnal_rf_set(srcfrag,
1277                                       kibnal_rf_addr(srcfrag) + wrknob,
1278                                       srcfrag->rf_nob - wrknob);
1279                 } else {
1280                         srcfrag++;
1281                         srcidx++;
1282                 }
1283
1284                 if (wrknob < dstfrag->rf_nob) {
1285                         kibnal_rf_set(dstfrag,
1286                                       kibnal_rf_addr(dstfrag) + wrknob,
1287                                       dstfrag->rf_nob - wrknob);
1288                 } else {
1289                         dstfrag++;
1290                         dstidx++;
1291                 }
1292
1293                 tx->tx_nwrq++;
1294         }
1295
1296         if (rc < 0)                             /* no RDMA if completing with failure */
1297                 tx->tx_nwrq = 0;
1298 #endif
1299
1300         ibmsg->ibm_u.completion.ibcm_status = rc;
1301         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1302         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1303
1304         return rc;
1305 }
1306
1307 void
1308 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1309 {
1310         spin_lock(&conn->ibc_lock);
1311         kibnal_queue_tx_locked (tx, conn);
1312         spin_unlock(&conn->ibc_lock);
1313
1314         kibnal_check_sends(conn);
1315 }
1316
1317 void
1318 kibnal_schedule_peer_arp (kib_peer_t *peer)
1319 {
1320         unsigned long flags;
1321
1322         LASSERT (peer->ibp_connecting != 0);
1323         LASSERT (peer->ibp_arp_count > 0);
1324
1325         kibnal_peer_addref(peer); /* extra ref for connd */
1326
1327         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1328
1329         list_add_tail (&peer->ibp_connd_list, &kibnal_data.kib_connd_peers);
1330         wake_up (&kibnal_data.kib_connd_waitq);
1331
1332         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1333 }
1334
1335 void
1336 kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
1337 {
1338         kib_peer_t      *peer;
1339         kib_conn_t      *conn;
1340         unsigned long    flags;
1341         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1342         int              retry;
1343         int              rc;
1344
1345         /* If I get here, I've committed to send, so I complete the tx with
1346          * failure on any problems */
1347
1348         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1349         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
1350
1351         for (retry = 0; ; retry = 1) {
1352                 read_lock_irqsave(g_lock, flags);
1353
1354                 peer = kibnal_find_peer_locked (nid);
1355                 if (peer != NULL) {
1356                         conn = kibnal_find_conn_locked (peer);
1357                         if (conn != NULL) {
1358                                 kibnal_conn_addref(conn); /* 1 ref for me... */
1359                                 read_unlock_irqrestore(g_lock, flags);
1360
1361                                 kibnal_queue_tx (tx, conn);
1362                                 kibnal_conn_decref(conn); /* ...to here */
1363                                 return;
1364                         }
1365                 }
1366
1367                 /* Making one or more connections; I'll need a write lock... */
1368                 read_unlock(g_lock);
1369                 write_lock(g_lock);
1370
1371                 peer = kibnal_find_peer_locked (nid);
1372                 if (peer != NULL)
1373                         break;
1374
1375                 write_unlock_irqrestore(g_lock, flags);
1376
1377                 if (retry) {
1378                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
1379
1380                         tx->tx_status = -EHOSTUNREACH;
1381                         tx->tx_waiting = 0;
1382                         kibnal_tx_done (tx);
1383                         return;
1384                 }
1385
1386                 rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid));
1387                 if (rc != 0) {
1388                         CERROR("Can't add peer %s: %d\n",
1389                                libcfs_nid2str(nid), rc);
1390
1391                         tx->tx_status = -EHOSTUNREACH;
1392                         tx->tx_waiting = 0;
1393                         kibnal_tx_done (tx);
1394                         return;
1395                 }
1396         }
1397
1398         conn = kibnal_find_conn_locked (peer);
1399         if (conn != NULL) {
1400                 /* Connection exists; queue message on it */
1401                 kibnal_conn_addref(conn);       /* 1 ref for me... */
1402                 write_unlock_irqrestore(g_lock, flags);
1403
1404                 kibnal_queue_tx (tx, conn);
1405                 kibnal_conn_decref(conn);       /* ...until here */
1406                 return;
1407         }
1408
1409         if (peer->ibp_connecting == 0 &&
1410             peer->ibp_accepting == 0) {
1411                 if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
1412                       time_after_eq(jiffies, peer->ibp_reconnect_time))) {
1413                         write_unlock_irqrestore(g_lock, flags);
1414                         tx->tx_status = -EHOSTUNREACH;
1415                         tx->tx_waiting = 0;
1416                         kibnal_tx_done (tx);
1417                         return;
1418                 }
1419
1420                 peer->ibp_connecting = 1;
1421                 peer->ibp_arp_count = 1 + *kibnal_tunables.kib_arp_retries;
1422                 kibnal_schedule_peer_arp(peer);
1423         }
1424
1425         /* A connection is being established; queue the message... */
1426         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1427
1428         write_unlock_irqrestore(g_lock, flags);
1429 }
1430
1431 int
1432 kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1433 {
1434         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
1435         int               type = lntmsg->msg_type;
1436         lnet_process_id_t target = lntmsg->msg_target;
1437         int               target_is_router = lntmsg->msg_target_is_router;
1438         int               routing = lntmsg->msg_routing;
1439         unsigned int      payload_niov = lntmsg->msg_niov;
1440         struct iovec     *payload_iov = lntmsg->msg_iov;
1441         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
1442         unsigned int      payload_offset = lntmsg->msg_offset;
1443         unsigned int      payload_nob = lntmsg->msg_len;
1444         kib_msg_t        *ibmsg;
1445         kib_tx_t         *tx;
1446         int               nob;
1447         int               rc;
1448
1449         /* NB 'private' is different depending on what we're sending.... */
1450
1451         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1452                payload_nob, payload_niov, libcfs_id2str(target));
1453
1454         LASSERT (payload_nob == 0 || payload_niov > 0);
1455         LASSERT (payload_niov <= LNET_MAX_IOV);
1456
1457         /* Thread context */
1458         LASSERT (!in_interrupt());
1459         /* payload is either all vaddrs or all pages */
1460         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1461
1462         switch (type) {
1463         default:
1464                 LBUG();
1465                 return (-EIO);
1466
1467         case LNET_MSG_ACK:
1468                 LASSERT (payload_nob == 0);
1469                 break;
1470
1471         case LNET_MSG_GET:
1472                 if (routing || target_is_router)
1473                         break;                  /* send IMMEDIATE */
1474
1475                 /* is the REPLY message too small for RDMA? */
1476                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1477                 if (nob <= IBNAL_MSG_SIZE)
1478                         break;                  /* send IMMEDIATE */
1479
1480                 tx = kibnal_get_idle_tx();
1481                 if (tx == NULL) {
1482                         CERROR("Can allocate txd for GET to %s: \n",
1483                                libcfs_nid2str(target.nid));
1484                         return -ENOMEM;
1485                 }
1486
1487                 ibmsg = tx->tx_msg;
1488                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1489                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1490
1491                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1492                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1493                                                  vv_acc_r_mem_write,
1494                                                  lntmsg->msg_md->md_niov,
1495                                                  lntmsg->msg_md->md_iov.iov,
1496                                                  0, lntmsg->msg_md->md_length);
1497                 else
1498                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
1499                                                   vv_acc_r_mem_write,
1500                                                   lntmsg->msg_md->md_niov,
1501                                                   lntmsg->msg_md->md_iov.kiov,
1502                                                   0, lntmsg->msg_md->md_length);
1503                 if (rc != 0) {
1504                         CERROR("Can't setup GET sink for %s: %d\n",
1505                                libcfs_nid2str(target.nid), rc);
1506                         kibnal_tx_done(tx);
1507                         return -EIO;
1508                 }
1509
1510 #if IBNAL_USE_FMR
1511                 nob = sizeof(kib_get_msg_t);
1512 #else
1513                 {
1514                         int n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
1515
1516                         nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
1517                 }
1518 #endif
1519                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
1520
1521                 tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni,
1522                                                          lntmsg);
1523                 if (tx->tx_lntmsg[1] == NULL) {
1524                         CERROR("Can't create reply for GET -> %s\n",
1525                                libcfs_nid2str(target.nid));
1526                         kibnal_tx_done(tx);
1527                         return -EIO;
1528                 }
1529
1530                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1531                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
1532                 kibnal_launch_tx(tx, target.nid);
1533                 return 0;
1534
1535         case LNET_MSG_REPLY:
1536         case LNET_MSG_PUT:
1537                 /* Is the payload small enough not to need RDMA? */
1538                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1539                 if (nob <= IBNAL_MSG_SIZE)
1540                         break;                  /* send IMMEDIATE */
1541
1542                 tx = kibnal_get_idle_tx();
1543                 if (tx == NULL) {
1544                         CERROR("Can't allocate %s txd for %s\n",
1545                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1546                                libcfs_nid2str(target.nid));
1547                         return -ENOMEM;
1548                 }
1549
1550                 if (payload_kiov == NULL)
1551                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
1552                                                  payload_niov, payload_iov,
1553                                                  payload_offset, payload_nob);
1554                 else
1555                         rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1556                                                   payload_niov, payload_kiov,
1557                                                   payload_offset, payload_nob);
1558                 if (rc != 0) {
1559                         CERROR("Can't setup PUT src for %s: %d\n",
1560                                libcfs_nid2str(target.nid), rc);
1561                         kibnal_tx_done(tx);
1562                         return -EIO;
1563                 }
1564
1565                 ibmsg = tx->tx_msg;
1566                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1567                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1568                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1569
1570                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1571                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
1572                 kibnal_launch_tx(tx, target.nid);
1573                 return 0;
1574         }
1575
1576         /* send IMMEDIATE */
1577
1578         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1579                  <= IBNAL_MSG_SIZE);
1580
1581         tx = kibnal_get_idle_tx();
1582         if (tx == NULL) {
1583                 CERROR ("Can't send %d to %s: tx descs exhausted\n",
1584                         type, libcfs_nid2str(target.nid));
1585                 return -ENOMEM;
1586         }
1587
1588         ibmsg = tx->tx_msg;
1589         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1590
1591         if (payload_kiov != NULL)
1592                 lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
1593                                     offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1594                                     payload_niov, payload_kiov,
1595                                     payload_offset, payload_nob);
1596         else
1597                 lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
1598                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1599                                    payload_niov, payload_iov,
1600                                    payload_offset, payload_nob);
1601
1602         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1603         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
1604
1605         tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1606         kibnal_launch_tx(tx, target.nid);
1607         return 0;
1608 }
1609
1610 void
1611 kibnal_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1612 {
1613         lnet_process_id_t target = lntmsg->msg_target;
1614         unsigned int      niov = lntmsg->msg_niov;
1615         struct iovec     *iov = lntmsg->msg_iov;
1616         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
1617         unsigned int      offset = lntmsg->msg_offset;
1618         unsigned int      nob = lntmsg->msg_len;
1619         kib_tx_t         *tx;
1620         int               rc;
1621
1622         tx = kibnal_get_idle_tx();
1623         if (tx == NULL) {
1624                 CERROR("Can't get tx for REPLY to %s\n",
1625                        libcfs_nid2str(target.nid));
1626                 goto failed_0;
1627         }
1628
1629         if (nob == 0)
1630                 rc = 0;
1631         else if (kiov == NULL)
1632                 rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
1633                                          niov, iov, offset, nob);
1634         else
1635                 rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
1636                                           niov, kiov, offset, nob);
1637
1638         if (rc != 0) {
1639                 CERROR("Can't setup GET src for %s: %d\n",
1640                        libcfs_nid2str(target.nid), rc);
1641                 goto failed_1;
1642         }
1643
1644         rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
1645                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1646                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1647         if (rc < 0) {
1648                 CERROR("Can't setup rdma for GET from %s: %d\n",
1649                        libcfs_nid2str(target.nid), rc);
1650                 goto failed_1;
1651         }
1652
1653         if (rc == 0) {
1654                 /* No RDMA: local completion may happen now! */
1655                 lnet_finalize(ni, lntmsg, 0);
1656         } else {
1657                 /* RDMA: lnet_finalize(lntmsg) when it
1658                  * completes */
1659                 tx->tx_lntmsg[0] = lntmsg;
1660         }
1661
1662         kibnal_queue_tx(tx, rx->rx_conn);
1663         return;
1664
1665  failed_1:
1666         kibnal_tx_done(tx);
1667  failed_0:
1668         lnet_finalize(ni, lntmsg, -EIO);
1669 }
1670
1671 int
1672 kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
1673                    void **new_private)
1674 {
1675         kib_rx_t    *rx = private;
1676         kib_conn_t  *conn = rx->rx_conn;
1677
1678         if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
1679                 /* Can't block if RDMA completions need normal credits */
1680                 LCONSOLE_ERROR_MSG(0x129, "Dropping message from %s: no buffers"
1681                                    " free. %s is running an old version of LNET "
1682                                    "that may deadlock if messages wait for"
1683                                    "buffers) \n",
1684                                    libcfs_nid2str(conn->ibc_peer->ibp_nid),
1685                                    libcfs_nid2str(conn->ibc_peer->ibp_nid));
1686                 return -EDEADLK;
1687         }
1688
1689         *new_private = private;
1690         return 0;
1691 }
1692
1693 int
1694 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1695              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1696              unsigned int offset, unsigned int mlen, unsigned int rlen)
1697 {
1698         kib_rx_t    *rx = private;
1699         kib_msg_t   *rxmsg = rx->rx_msg;
1700         kib_conn_t  *conn = rx->rx_conn;
1701         kib_tx_t    *tx;
1702         kib_msg_t   *txmsg;
1703         int          nob;
1704         int          post_cred = 1;
1705         int          rc = 0;
1706
1707         LASSERT (mlen <= rlen);
1708         LASSERT (!in_interrupt());
1709         /* Either all pages or all vaddrs */
1710         LASSERT (!(kiov != NULL && iov != NULL));
1711
1712         switch (rxmsg->ibm_type) {
1713         default:
1714                 LBUG();
1715
1716         case IBNAL_MSG_IMMEDIATE:
1717                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1718                 if (nob > rx->rx_nob) {
1719                         CERROR ("Immediate message from %s too big: %d(%d)\n",
1720                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1721                                 nob, rx->rx_nob);
1722                         rc = -EPROTO;
1723                         break;
1724                 }
1725
1726                 if (kiov != NULL)
1727                         lnet_copy_flat2kiov(niov, kiov, offset,
1728                                             IBNAL_MSG_SIZE, rxmsg,
1729                                             offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1730                                             mlen);
1731                 else
1732                         lnet_copy_flat2iov(niov, iov, offset,
1733                                            IBNAL_MSG_SIZE, rxmsg,
1734                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1735                                            mlen);
1736                 lnet_finalize (ni, lntmsg, 0);
1737                 break;
1738
1739         case IBNAL_MSG_PUT_REQ:
1740                 if (mlen == 0) {
1741                         lnet_finalize(ni, lntmsg, 0);
1742                         kibnal_send_completion(conn, IBNAL_MSG_PUT_NAK, 0,
1743                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1744                         break;
1745                 }
1746
1747                 tx = kibnal_get_idle_tx();
1748                 if (tx == NULL) {
1749                         CERROR("Can't allocate tx for %s\n",
1750                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1751                         /* Not replying will break the connection */
1752                         rc = -ENOMEM;
1753                         break;
1754                 }
1755
1756                 txmsg = tx->tx_msg;
1757                 if (kiov == NULL)
1758                         rc = kibnal_setup_rd_iov(tx,
1759                                                  &txmsg->ibm_u.putack.ibpam_rd,
1760                                                  vv_acc_r_mem_write,
1761                                                  niov, iov, offset, mlen);
1762                 else
1763                         rc = kibnal_setup_rd_kiov(tx,
1764                                                   &txmsg->ibm_u.putack.ibpam_rd,
1765                                                   vv_acc_r_mem_write,
1766                                                   niov, kiov, offset, mlen);
1767                 if (rc != 0) {
1768                         CERROR("Can't setup PUT sink for %s: %d\n",
1769                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1770                         kibnal_tx_done(tx);
1771                         /* tell peer it's over */
1772                         kibnal_send_completion(conn, IBNAL_MSG_PUT_NAK, rc,
1773                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1774                         break;
1775                 }
1776
1777                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1778                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1779 #if IBNAL_USE_FMR
1780                 nob = sizeof(kib_putack_msg_t);
1781 #else
1782                 {
1783                         int n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
1784
1785                         nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
1786                 }
1787 #endif
1788                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
1789
1790                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1791                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
1792                 kibnal_queue_tx(tx, conn);
1793
1794                 if (conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
1795                         post_cred = 0; /* peer still owns 'rx' for sending PUT_DONE */
1796                 break;
1797
1798         case IBNAL_MSG_GET_REQ:
1799                 if (lntmsg != NULL) {
1800                         /* Optimized GET; RDMA lntmsg's payload */
1801                         kibnal_reply(ni, rx, lntmsg);
1802                 } else {
1803                         /* GET didn't match anything */
1804                         kibnal_send_completion(conn, IBNAL_MSG_GET_DONE, -ENODATA,
1805                                                rxmsg->ibm_u.get.ibgm_cookie);
1806                 }
1807                 break;
1808         }
1809
1810         kibnal_post_rx(rx, post_cred, 0);
1811         return rc;
1812 }
1813
1814 int
1815 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1816 {
1817         long    pid = kernel_thread (fn, arg, 0);
1818
1819         if (pid < 0)
1820                 return ((int)pid);
1821
1822         atomic_inc (&kibnal_data.kib_nthreads);
1823         return (0);
1824 }
1825
1826 void
1827 kibnal_thread_fini (void)
1828 {
1829         atomic_dec (&kibnal_data.kib_nthreads);
1830 }
1831
1832 void
1833 kibnal_peer_alive (kib_peer_t *peer)
1834 {
1835         /* This is racy, but everyone's only writing cfs_time_current() */
1836         peer->ibp_last_alive = cfs_time_current();
1837         mb();
1838 }
1839
1840 void
1841 kibnal_peer_notify (kib_peer_t *peer)
1842 {
1843         time_t        last_alive = 0;
1844         int           error = 0;
1845         unsigned long flags;
1846
1847         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1848
1849         if (list_empty(&peer->ibp_conns) &&
1850             peer->ibp_accepting == 0 &&
1851             peer->ibp_connecting == 0 &&
1852             peer->ibp_error != 0) {
1853                 error = peer->ibp_error;
1854                 peer->ibp_error = 0;
1855
1856                 last_alive = cfs_time_current_sec() -
1857                              cfs_duration_sec(cfs_time_current() -
1858                                               peer->ibp_last_alive);
1859         }
1860
1861         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1862
1863         if (error != 0)
1864                 lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
1865 }
1866
1867 void
1868 kibnal_schedule_conn (kib_conn_t *conn)
1869 {
1870         unsigned long flags;
1871
1872         kibnal_conn_addref(conn);               /* ++ref for connd */
1873
1874         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1875
1876         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1877         wake_up (&kibnal_data.kib_connd_waitq);
1878
1879         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1880 }
1881
1882 void
1883 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1884 {
1885         /* This just does the immediate housekeeping.  'error' is zero for a
1886          * normal shutdown which can happen only after the connection has been
1887          * established.  If the connection is established, schedule the
1888          * connection to be finished off by the connd.  Otherwise the connd is
1889          * already dealing with it (either to set it up or tear it down).
1890          * Caller holds kib_global_lock exclusively in irq context */
1891         kib_peer_t       *peer = conn->ibc_peer;
1892
1893         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1894
1895         if (error != 0 && conn->ibc_comms_error == 0)
1896                 conn->ibc_comms_error = error;
1897
1898         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
1899                 return; /* already being handled  */
1900
1901         /* NB Can't take ibc_lock here (could be in IRQ context), without
1902          * risking deadlock, so access to ibc_{tx_queue,active_txs} is racey */
1903
1904         if (error == 0 &&
1905             list_empty(&conn->ibc_tx_queue) &&
1906             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1907             list_empty(&conn->ibc_tx_queue_nocred) &&
1908             list_empty(&conn->ibc_active_txs)) {
1909                 CDEBUG(D_NET, "closing conn to %s"
1910                        " rx# "LPD64" tx# "LPD64"\n",
1911                        libcfs_nid2str(peer->ibp_nid),
1912                        conn->ibc_txseq, conn->ibc_rxseq);
1913         } else {
1914                 CDEBUG(D_NETERROR, "Closing conn to %s: error %d%s%s%s%s"
1915                        " rx# "LPD64" tx# "LPD64"\n",
1916                        libcfs_nid2str(peer->ibp_nid), error,
1917                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1918                        list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1919                        list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1920                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
1921                        conn->ibc_txseq, conn->ibc_rxseq);
1922         }
1923
1924         list_del (&conn->ibc_list);
1925
1926         if (list_empty (&peer->ibp_conns)) {   /* no more conns */
1927                 if (peer->ibp_persistence == 0 && /* non-persistent peer */
1928                     kibnal_peer_active(peer))     /* still in peer table */
1929                         kibnal_unlink_peer_locked (peer);
1930
1931                 /* set/clear error on last conn */
1932                 peer->ibp_error = conn->ibc_comms_error;
1933         }
1934
1935         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECT1);
1936
1937         kibnal_schedule_conn(conn);
1938         kibnal_conn_decref(conn);               /* lose ibc_list's ref */
1939 }
1940
1941 void
1942 kibnal_close_conn (kib_conn_t *conn, int error)
1943 {
1944         unsigned long flags;
1945
1946         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1947
1948         kibnal_close_conn_locked (conn, error);
1949
1950         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1951 }
1952
1953 void
1954 kibnal_handle_early_rxs(kib_conn_t *conn)
1955 {
1956         unsigned long    flags;
1957         kib_rx_t        *rx;
1958
1959         LASSERT (!in_interrupt());
1960         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
1961
1962         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1963         while (!list_empty(&conn->ibc_early_rxs)) {
1964                 rx = list_entry(conn->ibc_early_rxs.next,
1965                                 kib_rx_t, rx_list);
1966                 list_del(&rx->rx_list);
1967                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1968
1969                 kibnal_handle_rx(rx);
1970
1971                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1972         }
1973         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1974 }
1975
1976 void
1977 kibnal_abort_txs(kib_conn_t *conn, struct list_head *txs)
1978 {
1979         LIST_HEAD           (zombies);
1980         struct list_head    *tmp;
1981         struct list_head    *nxt;
1982         kib_tx_t            *tx;
1983
1984         spin_lock(&conn->ibc_lock);
1985
1986         list_for_each_safe (tmp, nxt, txs) {
1987                 tx = list_entry (tmp, kib_tx_t, tx_list);
1988
1989                 if (txs == &conn->ibc_active_txs) {
1990                         LASSERT (!tx->tx_queued);
1991                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
1992                 } else {
1993                         LASSERT (tx->tx_queued);
1994                 }
1995
1996                 tx->tx_status = -ECONNABORTED;
1997                 tx->tx_queued = 0;
1998                 tx->tx_waiting = 0;
1999
2000                 if (tx->tx_sending == 0) {
2001                         list_del (&tx->tx_list);
2002                         list_add (&tx->tx_list, &zombies);
2003                 }
2004         }
2005
2006         spin_unlock(&conn->ibc_lock);
2007
2008         kibnal_txlist_done(&zombies, -ECONNABORTED);
2009 }
2010
2011 void
2012 kibnal_conn_disconnected(kib_conn_t *conn)
2013 {
2014         /* I'm the connd */
2015         LASSERT (!in_interrupt());
2016         LASSERT (current == kibnal_data.kib_connd);
2017         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
2018
2019         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
2020
2021         /* move QP to error state to make posted work items complete */
2022         kibnal_set_qp_state(conn, vv_qp_state_error);
2023
2024         /* Complete all tx descs not waiting for sends to complete.
2025          * NB we should be safe from RDMA now that the QP has changed state */
2026
2027         kibnal_abort_txs(conn, &conn->ibc_tx_queue);
2028         kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2029         kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2030         kibnal_abort_txs(conn, &conn->ibc_active_txs);
2031
2032         kibnal_handle_early_rxs(conn);
2033
2034         kibnal_peer_notify(conn->ibc_peer);
2035 }
2036
2037 void
2038 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
2039 {
2040         LIST_HEAD        (zombies);
2041         unsigned long     flags;
2042
2043         /* Only the connd creates conns => single threaded */
2044         LASSERT (error != 0);
2045         LASSERT (!in_interrupt());
2046         LASSERT (current == kibnal_data.kib_connd);
2047
2048         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2049
2050         if (active) {
2051                 LASSERT (peer->ibp_connecting != 0);
2052                 peer->ibp_connecting--;
2053         } else {
2054                 LASSERT (peer->ibp_accepting != 0);
2055                 peer->ibp_accepting--;
2056         }
2057
2058         if (peer->ibp_connecting != 0 ||
2059             peer->ibp_accepting != 0) {
2060                 /* another connection attempt under way (loopback?)... */
2061                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2062                 return;
2063         }
2064
2065         if (list_empty(&peer->ibp_conns)) {
2066                 /* Say when active connection can be re-attempted */
2067                 peer->ibp_reconnect_interval *= 2;
2068                 peer->ibp_reconnect_interval =
2069                         MAX(peer->ibp_reconnect_interval,
2070                             *kibnal_tunables.kib_min_reconnect_interval);
2071                 peer->ibp_reconnect_interval =
2072                         MIN(peer->ibp_reconnect_interval,
2073                             *kibnal_tunables.kib_max_reconnect_interval);
2074
2075                 peer->ibp_reconnect_time = jiffies +
2076                                            peer->ibp_reconnect_interval * HZ;
2077
2078                 /* Take peer's blocked transmits to complete with error */
2079                 list_add(&zombies, &peer->ibp_tx_queue);
2080                 list_del_init(&peer->ibp_tx_queue);
2081
2082                 if (kibnal_peer_active(peer) &&
2083                     (peer->ibp_persistence == 0)) {
2084                         /* failed connection attempt on non-persistent peer */
2085                         kibnal_unlink_peer_locked (peer);
2086                 }
2087
2088                 peer->ibp_error = error;
2089         } else {
2090                 /* Can't have blocked transmits if there are connections */
2091                 LASSERT (list_empty(&peer->ibp_tx_queue));
2092         }
2093
2094         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2095
2096         kibnal_peer_notify(peer);
2097
2098         if (list_empty (&zombies))
2099                 return;
2100
2101         CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
2102                 libcfs_nid2str(peer->ibp_nid));
2103
2104         kibnal_txlist_done(&zombies, -EHOSTUNREACH);
2105 }
2106
2107 void
2108 kibnal_reject(cm_cep_handle_t cep, int why)
2109 {
2110         static cm_reject_data_t   rejs[3];
2111         cm_reject_data_t         *rej = &rejs[why];
2112
2113         LASSERT (why >= 0 && why < sizeof(rejs)/sizeof(rejs[0]));
2114
2115         /* If I wasn't so lazy, I'd initialise this only once; it's effective
2116          * read-only */
2117         rej->reason = cm_rej_code_usr_rej;
2118         rej->priv_data[0] = (IBNAL_MSG_MAGIC) & 0xff;
2119         rej->priv_data[1] = (IBNAL_MSG_MAGIC >> 8) & 0xff;
2120         rej->priv_data[2] = (IBNAL_MSG_MAGIC >> 16) & 0xff;
2121         rej->priv_data[3] = (IBNAL_MSG_MAGIC >> 24) & 0xff;
2122         rej->priv_data[4] = (IBNAL_MSG_VERSION) & 0xff;
2123         rej->priv_data[5] = (IBNAL_MSG_VERSION >> 8) & 0xff;
2124         rej->priv_data[6] = why;
2125
2126         cm_reject(cep, rej);
2127 }
2128
2129 void
2130 kibnal_connreq_done(kib_conn_t *conn, int active, int status)
2131 {
2132         struct list_head   txs;
2133         kib_peer_t        *peer = conn->ibc_peer;
2134         unsigned long      flags;
2135         kib_tx_t          *tx;
2136
2137         CDEBUG(D_NET,"%d\n", status);
2138
2139         /* Only the connd creates conns => single threaded */
2140         LASSERT (!in_interrupt());
2141         LASSERT (current == kibnal_data.kib_connd);
2142         LASSERT (conn->ibc_state < IBNAL_CONN_ESTABLISHED);
2143
2144         if (active) {
2145                 LASSERT (peer->ibp_connecting > 0);
2146         } else {
2147                 LASSERT (peer->ibp_accepting > 0);
2148         }
2149
2150         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2151         conn->ibc_connvars = NULL;
2152
2153         if (status != 0) {
2154                 /* failed to establish connection */
2155                 switch (conn->ibc_state) {
2156                 default:
2157                         LBUG();
2158
2159                 case IBNAL_CONN_ACTIVE_CHECK_REPLY:
2160                         /* got a connection reply but failed checks */
2161                         LASSERT (active);
2162                         kibnal_reject(conn->ibc_cep, IBNAL_REJECT_FATAL);
2163                         break;
2164
2165                 case IBNAL_CONN_ACTIVE_CONNECT:
2166                         LASSERT (active);
2167                         cm_cancel(conn->ibc_cep);
2168                         cfs_pause(cfs_time_seconds(1)/10);
2169                         /* cm_connect() failed immediately or
2170                          * callback returned failure */
2171                         break;
2172
2173                 case IBNAL_CONN_ACTIVE_ARP:
2174                         LASSERT (active);
2175                         /* ibat_get_ib_data() failed immediately 
2176                          * or callback returned failure */
2177                         break;
2178
2179                 case IBNAL_CONN_INIT:
2180                         break;
2181
2182                 case IBNAL_CONN_PASSIVE_WAIT:
2183                         LASSERT (!active);
2184                         /* cm_accept callback returned failure */
2185                         break;
2186                 }
2187
2188                 kibnal_peer_connect_failed(peer, active, status);
2189                 kibnal_conn_disconnected(conn);
2190                 return;
2191         }
2192
2193         /* connection established */
2194         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2195
2196         if (active) {
2197                 LASSERT(conn->ibc_state == IBNAL_CONN_ACTIVE_RTU);
2198         } else {
2199                 LASSERT(conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2200         }
2201
2202         conn->ibc_last_send = jiffies;
2203         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
2204         kibnal_peer_alive(peer);
2205
2206         /* Add conn to peer's list and nuke any dangling conns from a different
2207          * peer instance... */
2208         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
2209         list_add(&conn->ibc_list, &peer->ibp_conns);
2210         kibnal_close_stale_conns_locked (peer, conn->ibc_incarnation);
2211
2212         if (!kibnal_peer_active(peer) ||        /* peer has been deleted */
2213             conn->ibc_comms_error != 0 ||       /* comms error */
2214             conn->ibc_disconnect) {             /* need to disconnect */
2215
2216                 /* start to shut down connection */
2217                 kibnal_close_conn_locked(conn, -ECONNABORTED);
2218
2219                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2220                 kibnal_peer_connect_failed(peer, active, -ECONNABORTED);
2221                 return;
2222         }
2223
2224         if (active)
2225                 peer->ibp_connecting--;
2226         else
2227                 peer->ibp_accepting--;
2228
2229         /* grab pending txs while I have the lock */
2230         list_add(&txs, &peer->ibp_tx_queue);
2231         list_del_init(&peer->ibp_tx_queue);
2232
2233         peer->ibp_reconnect_interval = 0;       /* OK to reconnect at any time */
2234
2235         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2236
2237         /* Schedule blocked txs */
2238         spin_lock (&conn->ibc_lock);
2239         while (!list_empty (&txs)) {
2240                 tx = list_entry (txs.next, kib_tx_t, tx_list);
2241                 list_del (&tx->tx_list);
2242
2243                 kibnal_queue_tx_locked (tx, conn);
2244         }
2245         spin_unlock (&conn->ibc_lock);
2246         kibnal_check_sends (conn);
2247
2248         /* schedule blocked rxs */
2249         kibnal_handle_early_rxs(conn);
2250 }
2251
2252 void
2253 kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
2254 {
2255         static cm_dreply_data_t drep;           /* just zeroed space */
2256
2257         kib_conn_t             *conn = (kib_conn_t *)arg;
2258         unsigned long           flags;
2259
2260         /* CAVEAT EMPTOR: tasklet context */
2261
2262         switch (cmdata->status) {
2263         default:
2264                 LBUG();
2265
2266         case cm_event_disconn_request:
2267                 /* IBNAL_CONN_ACTIVE_RTU:  gets closed in kibnal_connreq_done
2268                  * IBNAL_CONN_ESTABLISHED: I start it closing
2269                  * otherwise:              it's closing anyway */
2270                 cm_disconnect(conn->ibc_cep, NULL, &drep);
2271                 cm_cancel(conn->ibc_cep);
2272
2273                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2274                 LASSERT (!conn->ibc_disconnect);
2275                 conn->ibc_disconnect = 1;
2276
2277                 switch (conn->ibc_state) {
2278                 default:
2279                         LBUG();
2280
2281                 case IBNAL_CONN_ACTIVE_RTU:
2282                         /* kibnal_connreq_done is getting there; It'll see
2283                          * ibc_disconnect set... */
2284                         break;
2285
2286                 case IBNAL_CONN_ESTABLISHED:
2287                         /* kibnal_connreq_done got there already; get
2288                          * disconnect going... */
2289                         kibnal_close_conn_locked(conn, 0);
2290                         break;
2291
2292                 case IBNAL_CONN_DISCONNECT1:
2293                         /* kibnal_disconnect_conn is getting there; It'll see
2294                          * ibc_disconnect set... */
2295                         break;
2296
2297                 case IBNAL_CONN_DISCONNECT2:
2298                         /* kibnal_disconnect_conn got there already; complete
2299                          * the disconnect. */
2300                         kibnal_schedule_conn(conn);
2301                         break;
2302                 }
2303                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2304                 break;
2305
2306         case cm_event_disconn_timeout:
2307         case cm_event_disconn_reply:
2308                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2309                 LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT2);
2310                 LASSERT (!conn->ibc_disconnect);
2311                 conn->ibc_disconnect = 1;
2312
2313                 /* kibnal_disconnect_conn sent the disconnect request. */
2314                 kibnal_schedule_conn(conn);
2315
2316                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2317                 break;
2318
2319         case cm_event_connected:
2320         case cm_event_conn_timeout:
2321         case cm_event_conn_reject:
2322                 LASSERT (conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
2323                 conn->ibc_connvars->cv_conndata = *cmdata;
2324
2325                 kibnal_schedule_conn(conn);
2326                 break;
2327         }
2328
2329         kibnal_conn_decref(conn); /* lose my ref */
2330 }
2331
2332 void
2333 kibnal_check_passive_wait(kib_conn_t *conn)
2334 {
2335         int     rc;
2336
2337         switch (conn->ibc_connvars->cv_conndata.status) {
2338         default:
2339                 LBUG();
2340
2341         case cm_event_connected:
2342                 kibnal_conn_addref(conn); /* ++ ref for CM callback */
2343                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2344                 if (rc != 0)
2345                         conn->ibc_comms_error = rc;
2346                 /* connection _has_ been established; it's just that we've had
2347                  * an error immediately... */
2348                 kibnal_connreq_done(conn, 0, 0);
2349                 break;
2350
2351         case cm_event_conn_timeout:
2352                 kibnal_connreq_done(conn, 0, -ETIMEDOUT);
2353                 break;
2354
2355         case cm_event_conn_reject:
2356                 kibnal_connreq_done(conn, 0, -ECONNRESET);
2357                 break;
2358         }
2359 }
2360
2361 void
2362 kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
2363 {
2364         static kib_msg_t        txmsg;
2365         static kib_msg_t        rxmsg;
2366         static cm_reply_data_t  reply;
2367
2368         kib_conn_t         *conn = NULL;
2369         int                 rc = 0;
2370         int                 reason;
2371         int                 rxmsgnob;
2372         rwlock_t           *g_lock = &kibnal_data.kib_global_lock;
2373         kib_peer_t         *peer;
2374         kib_peer_t         *peer2;
2375         unsigned long       flags;
2376         kib_connvars_t     *cv;
2377         cm_return_t         cmrc;
2378         vv_return_t         vvrc;
2379
2380         /* I'm the connd executing in thread context
2381          * No concurrency problems with static data! */
2382         LASSERT (!in_interrupt());
2383         LASSERT (current == kibnal_data.kib_connd);
2384
2385         if (cmreq->sid != (__u64)(*kibnal_tunables.kib_service_number)) {
2386                 CERROR(LPX64" != IBNAL_SERVICE_NUMBER("LPX64")\n",
2387                        cmreq->sid, (__u64)(*kibnal_tunables.kib_service_number));
2388                 reason = IBNAL_REJECT_FATAL;
2389                 goto reject;
2390         }
2391
2392         /* copy into rxmsg to avoid alignment issues */
2393         rxmsgnob = MIN(cm_REQ_priv_data_len, sizeof(rxmsg));
2394         memcpy(&rxmsg, cmreq->priv_data, rxmsgnob);
2395
2396         rc = kibnal_unpack_msg(&rxmsg, 0, rxmsgnob);
2397         if (rc != 0) {
2398                 /* SILENT! kibnal_unpack_msg() complains if required */
2399                 reason = IBNAL_REJECT_FATAL;
2400                 goto reject;
2401         }
2402
2403         if (rxmsg.ibm_version != IBNAL_MSG_VERSION)
2404                 CWARN("Connection from %s: old protocol version 0x%x\n",
2405                       libcfs_nid2str(rxmsg.ibm_srcnid), rxmsg.ibm_version);
2406
2407         if (rxmsg.ibm_type != IBNAL_MSG_CONNREQ) {
2408                 CERROR("Unexpected connreq msg type: %x from %s\n",
2409                        rxmsg.ibm_type, libcfs_nid2str(rxmsg.ibm_srcnid));
2410                 reason = IBNAL_REJECT_FATAL;
2411                 goto reject;
2412         }
2413
2414         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
2415                                      rxmsg.ibm_dstnid)) {
2416                 CERROR("Can't accept %s: bad dst nid %s\n",
2417                        libcfs_nid2str(rxmsg.ibm_srcnid),
2418                        libcfs_nid2str(rxmsg.ibm_dstnid));
2419                 reason = IBNAL_REJECT_FATAL;
2420                 goto reject;
2421         }
2422
2423         if (rxmsg.ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2424                 CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
2425                        libcfs_nid2str(rxmsg.ibm_srcnid),
2426                        rxmsg.ibm_u.connparams.ibcp_queue_depth,
2427                        IBNAL_MSG_QUEUE_SIZE);
2428                 reason = IBNAL_REJECT_FATAL;
2429                 goto reject;
2430         }
2431
2432         if (rxmsg.ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2433                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2434                        libcfs_nid2str(rxmsg.ibm_srcnid),
2435                        rxmsg.ibm_u.connparams.ibcp_max_msg_size,
2436                        IBNAL_MSG_SIZE);
2437                 reason = IBNAL_REJECT_FATAL;
2438                 goto reject;
2439         }
2440
2441         if (rxmsg.ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2442                 CERROR("Can't accept %s: max frags %d too big (%d max)\n",
2443                        libcfs_nid2str(rxmsg.ibm_srcnid),
2444                        rxmsg.ibm_u.connparams.ibcp_max_frags,
2445                        IBNAL_MAX_RDMA_FRAGS);
2446                 reason = IBNAL_REJECT_FATAL;
2447                 goto reject;
2448         }
2449
2450         /* assume 'rxmsg.ibm_srcnid' is a new peer; create */
2451         rc = kibnal_create_peer (&peer, rxmsg.ibm_srcnid);
2452         if (rc != 0) {
2453                 CERROR("Can't create peer for %s\n",
2454                        libcfs_nid2str(rxmsg.ibm_srcnid));
2455                 reason = IBNAL_REJECT_NO_RESOURCES;
2456                 goto reject;
2457         }
2458
2459         write_lock_irqsave(g_lock, flags);
2460
2461         if (kibnal_data.kib_listen_handle == NULL) {
2462                 write_unlock_irqrestore(g_lock, flags);
2463
2464                 CWARN ("Shutdown has started, rejecting connreq from %s\n",
2465                        libcfs_nid2str(rxmsg.ibm_srcnid));
2466                 kibnal_peer_decref(peer);
2467                 reason = IBNAL_REJECT_FATAL;
2468                 goto reject;
2469         }
2470
2471         peer2 = kibnal_find_peer_locked(rxmsg.ibm_srcnid);
2472         if (peer2 != NULL) {
2473                 /* tie-break connection race in favour of the higher NID */
2474                 if (peer2->ibp_connecting != 0 &&
2475                     rxmsg.ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
2476                         write_unlock_irqrestore(g_lock, flags);
2477
2478                         CWARN("Conn race %s\n",
2479                               libcfs_nid2str(rxmsg.ibm_srcnid));
2480
2481                         kibnal_peer_decref(peer);
2482                         reason = IBNAL_REJECT_CONN_RACE;
2483                         goto reject;
2484                 }
2485
2486                 peer2->ibp_accepting++;
2487                 kibnal_peer_addref(peer2);
2488
2489                 write_unlock_irqrestore(g_lock, flags);
2490                 kibnal_peer_decref(peer);
2491                 peer = peer2;
2492         } else {
2493                 /* Brand new peer */
2494                 LASSERT (peer->ibp_accepting == 0);
2495                 peer->ibp_accepting = 1;
2496
2497                 kibnal_peer_addref(peer);
2498                 list_add_tail(&peer->ibp_list, kibnal_nid2peerlist(rxmsg.ibm_srcnid));
2499
2500                 write_unlock_irqrestore(g_lock, flags);
2501         }
2502
2503         conn = kibnal_create_conn(cep);
2504         if (conn == NULL) {
2505                 CERROR("Can't create conn for %s\n",
2506                        libcfs_nid2str(rxmsg.ibm_srcnid));
2507                 kibnal_peer_connect_failed(peer, 0, -ENOMEM);
2508                 kibnal_peer_decref(peer);
2509                 reason = IBNAL_REJECT_NO_RESOURCES;
2510                 goto reject;
2511         }
2512
2513         conn->ibc_version = rxmsg.ibm_version;
2514
2515         conn->ibc_peer = peer;              /* conn takes over my ref */
2516         conn->ibc_incarnation = rxmsg.ibm_srcstamp;
2517         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2518         conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2519         LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2520                  <= IBNAL_RX_MSGS);
2521
2522         cv = conn->ibc_connvars;
2523
2524         cv->cv_txpsn          = cmreq->cep_data.start_psn;
2525         cv->cv_remote_qpn     = cmreq->cep_data.qpn;
2526         cv->cv_path           = cmreq->path_data.path;
2527         cv->cv_rnr_count      = cmreq->cep_data.rtr_retry_cnt;
2528         // XXX                  cmreq->cep_data.retry_cnt;
2529         cv->cv_port           = cmreq->cep_data.local_port_num;
2530
2531         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
2532                              &cv->cv_path.sgid, &cv->cv_sgid_index);
2533         if (vvrc != vv_return_ok) {
2534                 CERROR("gid2gid_index failed for %s: %d\n",
2535                        libcfs_nid2str(rxmsg.ibm_srcnid), vvrc);
2536                 rc = -EIO;
2537                 reason = IBNAL_REJECT_FATAL;
2538                 goto reject;
2539         }
2540
2541         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
2542                                cv->cv_path.pkey, &cv->cv_pkey_index);
2543         if (vvrc != vv_return_ok) {
2544                 CERROR("pkey2pkey_index failed for %s: %d\n",
2545                        libcfs_nid2str(rxmsg.ibm_srcnid), vvrc);
2546                 rc = -EIO;
2547                 reason = IBNAL_REJECT_FATAL;
2548                 goto reject;
2549         }
2550
2551         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
2552         if (rc != 0) {
2553                 reason = IBNAL_REJECT_FATAL;
2554                 goto reject;
2555         }
2556
2557         rc = kibnal_post_receives(conn);
2558         if (rc != 0) {
2559                 CERROR("Can't post receives for %s\n",
2560                        libcfs_nid2str(rxmsg.ibm_srcnid));
2561                 reason = IBNAL_REJECT_FATAL;
2562                 goto reject;
2563         }
2564
2565         rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2566         if (rc != 0) {
2567                 reason = IBNAL_REJECT_FATAL;
2568                 goto reject;
2569         }
2570
2571         memset(&reply, 0, sizeof(reply));
2572         reply.qpn                 = cv->cv_local_qpn;
2573         reply.qkey                = IBNAL_QKEY;
2574         reply.start_psn           = cv->cv_rxpsn;
2575         reply.arb_initiator_depth = IBNAL_ARB_INITIATOR_DEPTH;
2576         reply.arb_resp_res        = IBNAL_ARB_RESP_RES;
2577         reply.failover_accepted   = IBNAL_FAILOVER_ACCEPTED;
2578         reply.rnr_retry_count     = cv->cv_rnr_count;
2579         reply.targ_ack_delay      = kibnal_data.kib_hca_attrs.ack_delay;
2580
2581         /* setup txmsg... */
2582         memset(&txmsg, 0, sizeof(txmsg));
2583         kibnal_init_msg(&txmsg, IBNAL_MSG_CONNACK,
2584                         sizeof(txmsg.ibm_u.connparams));
2585         LASSERT (txmsg.ibm_nob <= cm_REP_priv_data_len);
2586         txmsg.ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2587         txmsg.ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2588         txmsg.ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2589         kibnal_pack_msg(&txmsg, conn->ibc_version,
2590                         0, rxmsg.ibm_srcnid, rxmsg.ibm_srcstamp, 0);
2591
2592         /* ...and copy into reply to avoid alignment issues */
2593         memcpy(&reply.priv_data, &txmsg, txmsg.ibm_nob);
2594
2595         kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
2596
2597         cmrc = cm_accept(conn->ibc_cep, &reply, NULL,
2598                          kibnal_cm_callback, conn);
2599
2600         if (cmrc == cm_stat_success)
2601                 return;                         /* callback has got my ref on conn */
2602
2603         /* back out state change (no callback happening) */
2604         kibnal_set_conn_state(conn, IBNAL_CONN_INIT);
2605         rc = -EIO;
2606         reason = IBNAL_REJECT_FATAL;
2607
2608  reject:
2609         CDEBUG(D_NET, "Rejecting connreq from %s\n",
2610                libcfs_nid2str(rxmsg.ibm_srcnid));
2611
2612         kibnal_reject(cep, reason);
2613
2614         if (conn != NULL) {
2615                 LASSERT (rc != 0);
2616                 kibnal_connreq_done(conn, 0, rc);
2617                 kibnal_conn_decref(conn);
2618         } else {
2619                 cm_destroy_cep(cep);
2620         }
2621 }
2622
2623 void
2624 kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *data, void *arg)
2625 {
2626         cm_request_data_t  *cmreq = &data->data.request;
2627         kib_pcreq_t        *pcr;
2628         unsigned long       flags;
2629
2630         LASSERT (arg == NULL);
2631
2632         if (data->status != cm_event_conn_request) {
2633                 CERROR("status %d is not cm_event_conn_request\n",
2634                        data->status);
2635                 return;
2636         }
2637
2638         LIBCFS_ALLOC_ATOMIC(pcr, sizeof(*pcr));
2639         if (pcr == NULL) {
2640                 CERROR("Can't allocate passive connreq\n");
2641
2642                 kibnal_reject(cep, IBNAL_REJECT_NO_RESOURCES);
2643                 cm_destroy_cep(cep);
2644                 return;
2645         }
2646
2647         pcr->pcr_cep = cep;
2648         pcr->pcr_cmreq = *cmreq;
2649
2650         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
2651
2652         list_add_tail(&pcr->pcr_list, &kibnal_data.kib_connd_pcreqs);
2653         wake_up(&kibnal_data.kib_connd_waitq);
2654 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
2655 }
2656
2657
2658 void
2659 kibnal_active_connect_callback (cm_cep_handle_t cep, cm_conn_data_t *cd,
2660                                 void *arg)
2661 {
2662         /* CAVEAT EMPTOR: tasklet context */
2663         kib_conn_t       *conn = (kib_conn_t *)arg;
2664         kib_connvars_t   *cv = conn->ibc_connvars;
2665
2666         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2667         cv->cv_conndata = *cd;
2668
2669         kibnal_schedule_conn(conn);
2670         kibnal_conn_decref(conn);
2671 }
2672
2673 void
2674 kibnal_connect_conn (kib_conn_t *conn)
2675 {
2676         static cm_request_data_t  cmreq;
2677         static kib_msg_t          msg;
2678
2679         kib_connvars_t           *cv = conn->ibc_connvars;
2680         kib_peer_t               *peer = conn->ibc_peer;
2681         cm_return_t               cmrc;
2682
2683         /* Only called by connd => statics OK */
2684         LASSERT (!in_interrupt());
2685         LASSERT (current == kibnal_data.kib_connd);
2686         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
2687
2688         memset(&cmreq, 0, sizeof(cmreq));
2689
2690         cmreq.sid = (__u64)(*kibnal_tunables.kib_service_number);
2691
2692         cmreq.cep_data.ca_guid              = kibnal_data.kib_hca_attrs.guid;
2693         cmreq.cep_data.qpn                  = cv->cv_local_qpn;
2694         cmreq.cep_data.retry_cnt            = *kibnal_tunables.kib_retry_cnt;
2695         cmreq.cep_data.rtr_retry_cnt        = *kibnal_tunables.kib_rnr_cnt;
2696         cmreq.cep_data.start_psn            = cv->cv_rxpsn;
2697         cmreq.cep_data.end_to_end_flow_ctrl = IBNAL_EE_FLOW_CNT;
2698         // XXX ack_timeout?
2699         // offered_resp_res
2700         // offered_initiator_depth
2701
2702         cmreq.path_data.subn_local  = IBNAL_LOCAL_SUB;
2703         cmreq.path_data.path        = cv->cv_path;
2704
2705         /* setup msg... */
2706         memset(&msg, 0, sizeof(msg));
2707         kibnal_init_msg(&msg, IBNAL_MSG_CONNREQ, sizeof(msg.ibm_u.connparams));
2708         LASSERT(msg.ibm_nob <= cm_REQ_priv_data_len);
2709         msg.ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
2710         msg.ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
2711         msg.ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
2712         kibnal_pack_msg(&msg, conn->ibc_version, 0, peer->ibp_nid, 0, 0);
2713
2714         if (the_lnet.ln_testprotocompat != 0) {
2715                 /* single-shot proto check */
2716                 LNET_LOCK();
2717                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
2718                         msg.ibm_version++;
2719                         the_lnet.ln_testprotocompat &= ~1;
2720                 }
2721                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
2722                         msg.ibm_magic = LNET_PROTO_MAGIC;
2723                         the_lnet.ln_testprotocompat &= ~2;
2724                 }
2725                 LNET_UNLOCK();
2726         }
2727
2728         /* ...and copy into cmreq to avoid alignment issues */
2729         memcpy(&cmreq.priv_data, &msg, msg.ibm_nob);
2730
2731         CDEBUG(D_NET, "Connecting %p to %s\n", conn,
2732                libcfs_nid2str(peer->ibp_nid));
2733
2734         kibnal_conn_addref(conn);               /* ++ref for CM callback */
2735         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CONNECT);
2736
2737         cmrc = cm_connect(conn->ibc_cep, &cmreq,
2738                           kibnal_active_connect_callback, conn);
2739         if (cmrc == cm_stat_success) {
2740                 CDEBUG(D_NET, "connection REQ sent to %s\n",
2741                        libcfs_nid2str(peer->ibp_nid));
2742                 return;
2743         }
2744
2745         CERROR ("Connect %s failed: %d\n", libcfs_nid2str(peer->ibp_nid), cmrc);
2746         kibnal_conn_decref(conn);       /* drop callback's ref */
2747         kibnal_connreq_done(conn, 1, -EHOSTUNREACH);
2748 }
2749
2750 void
2751 kibnal_reconnect (kib_conn_t *conn, int why)
2752 {
2753         kib_peer_t      *peer = conn->ibc_peer;
2754         int              retry;
2755         unsigned long    flags;
2756         cm_return_t      cmrc;
2757         cm_cep_handle_t  cep;
2758
2759         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2760
2761         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2762
2763         LASSERT (peer->ibp_connecting > 0);          /* 'conn' at least */
2764
2765         /* retry connection if it's still needed and no other connection
2766          * attempts (active or passive) are in progress.
2767          * Immediate reconnect is required, so I don't even look at the
2768          * reconnection timeout etc */
2769
2770         retry = (!list_empty(&peer->ibp_tx_queue) &&
2771                  peer->ibp_connecting == 1 &&
2772                  peer->ibp_accepting == 0);
2773
2774         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2775
2776         if (!retry) {
2777                 kibnal_connreq_done(conn, 1, why);
2778                 return;
2779         }
2780
2781         cep = cm_create_cep(cm_cep_transp_rc);
2782         if (cep == NULL) {
2783                 CERROR("Can't create new CEP\n");
2784                 kibnal_connreq_done(conn, 1, -ENOMEM);
2785                 return;
2786         }
2787
2788         cmrc = cm_cancel(conn->ibc_cep);
2789         LASSERT (cmrc == cm_stat_success);
2790         cmrc = cm_destroy_cep(conn->ibc_cep);
2791         LASSERT (cmrc == cm_stat_success);
2792
2793         conn->ibc_cep = cep;
2794
2795         /* reuse conn; no need to peer->ibp_connecting++ */
2796         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
2797         kibnal_connect_conn(conn);
2798 }
2799
2800 void
2801 kibnal_check_connreply (kib_conn_t *conn)
2802 {
2803         static cm_rtu_data_t  rtu;
2804         static kib_msg_t      msg;
2805
2806         kib_connvars_t   *cv = conn->ibc_connvars;
2807         cm_reply_data_t  *reply = &cv->cv_conndata.data.reply;
2808         kib_peer_t       *peer = conn->ibc_peer;
2809         int               msgnob;
2810         cm_return_t       cmrc;
2811         unsigned long     flags;
2812         int               rc;
2813
2814         /* Only called by connd => statics OK */
2815         LASSERT (!in_interrupt());
2816         LASSERT (current == kibnal_data.kib_connd);
2817         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
2818
2819         if (cv->cv_conndata.status == cm_event_conn_reply) {
2820                 cv->cv_remote_qpn = reply->qpn;
2821                 cv->cv_txpsn      = reply->start_psn;
2822                 // XXX              reply->targ_ack_delay;
2823                 cv->cv_rnr_count  = reply->rnr_retry_count;
2824
2825                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2826
2827                 /* copy into msg to avoid alignment issues */
2828                 msgnob = MIN(cm_REP_priv_data_len, sizeof(msg));
2829                 memcpy(&msg, &reply->priv_data, msgnob);
2830
2831                 rc = kibnal_unpack_msg(&msg, conn->ibc_version, msgnob);
2832                 if (rc != 0) {
2833                         CERROR("Can't unpack reply from %s\n",
2834                                libcfs_nid2str(peer->ibp_nid));
2835                         kibnal_connreq_done(conn, 1, rc);
2836                         return;
2837                 }
2838
2839                 if (msg.ibm_type != IBNAL_MSG_CONNACK ) {
2840                         CERROR("Unexpected message type %d from %s\n",
2841                                msg.ibm_type, libcfs_nid2str(peer->ibp_nid));
2842                         kibnal_connreq_done(conn, 1, -EPROTO);
2843                         return;
2844                 }
2845
2846                 if (msg.ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
2847                         CERROR("%s has incompatible queue depth %d(%d wanted)\n",
2848                                libcfs_nid2str(peer->ibp_nid),
2849                                msg.ibm_u.connparams.ibcp_queue_depth,
2850                                IBNAL_MSG_QUEUE_SIZE);
2851                         kibnal_connreq_done(conn, 1, -EPROTO);
2852                         return;
2853                 }
2854
2855                 if (msg.ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
2856                         CERROR("%s max message size %d too big (%d max)\n",
2857                                libcfs_nid2str(peer->ibp_nid),
2858                                msg.ibm_u.connparams.ibcp_max_msg_size,
2859                                IBNAL_MSG_SIZE);
2860                         kibnal_connreq_done(conn, 1, -EPROTO);
2861                         return;
2862                 }
2863
2864                 if (msg.ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
2865                         CERROR("%s max frags %d too big (%d max)\n",
2866                                libcfs_nid2str(peer->ibp_nid),
2867                                msg.ibm_u.connparams.ibcp_max_frags,
2868                                IBNAL_MAX_RDMA_FRAGS);
2869                         kibnal_connreq_done(conn, 1, -EPROTO);
2870                         return;
2871                 }
2872
2873                 read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2874                 if (lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
2875                                             msg.ibm_dstnid) &&
2876                     msg.ibm_dststamp == kibnal_data.kib_incarnation)
2877                         rc = 0;
2878                 else
2879                         rc = -ESTALE;
2880                 read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2881                 if (rc != 0) {
2882                         CERROR("Stale connection reply from %s\n",
2883                                libcfs_nid2str(peer->ibp_nid));
2884                         kibnal_connreq_done(conn, 1, rc);
2885                         return;
2886                 }
2887
2888                 conn->ibc_incarnation = msg.ibm_srcstamp;
2889                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2890                 conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
2891                 LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
2892                          <= IBNAL_RX_MSGS);
2893
2894                 rc = kibnal_post_receives(conn);
2895                 if (rc != 0) {
2896                         CERROR("Can't post receives for %s\n",
2897                                libcfs_nid2str(peer->ibp_nid));
2898                         kibnal_connreq_done(conn, 1, rc);
2899                         return;
2900                 }
2901
2902                 rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
2903                 if (rc != 0) {
2904                         kibnal_connreq_done(conn, 1, rc);
2905                         return;
2906                 }
2907
2908                 rc = kibnal_set_qp_state(conn, vv_qp_state_rts);
2909                 if (rc != 0) {
2910                         kibnal_connreq_done(conn, 1, rc);
2911                         return;
2912                 }
2913
2914                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_RTU);
2915                 kibnal_conn_addref(conn);       /* ++for CM callback */
2916
2917                 memset(&rtu, 0, sizeof(rtu));
2918                 cmrc = cm_accept(conn->ibc_cep, NULL, &rtu,
2919                                  kibnal_cm_callback, conn);
2920                 if (cmrc == cm_stat_success) {
2921                         /* Now I'm racing with disconnect signalled by
2922                          * kibnal_cm_callback */
2923                         kibnal_connreq_done(conn, 1, 0);
2924                         return;
2925                 }
2926
2927                 CERROR("cm_accept %s failed: %d\n",
2928                        libcfs_nid2str(peer->ibp_nid), cmrc);
2929                 /* Back out of RTU: no callback coming */
2930                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
2931                 kibnal_conn_decref(conn);
2932                 kibnal_connreq_done(conn, 1, -EIO);
2933                 return;
2934         }
2935
2936         if (cv->cv_conndata.status == cm_event_conn_reject) {
2937
2938                 if (cv->cv_conndata.data.reject.reason == cm_rej_code_usr_rej) {
2939                         unsigned char *bytes =
2940                                 cv->cv_conndata.data.reject.priv_data;
2941                         int   magic   = (bytes[0]) |
2942                                         (bytes[1] << 8) |
2943                                         (bytes[2] << 16) |
2944                                         (bytes[3] << 24);
2945                         int   version = (bytes[4]) |
2946                                         (bytes[5] << 8);
2947                         int   why     = (bytes[6]);
2948
2949                         /* Expected proto/version: she just doesn't like me (or
2950                          * ran out of resources) */
2951                         if (magic == IBNAL_MSG_MAGIC &&
2952                             version == conn->ibc_version) {
2953                                 CERROR("conn -> %s rejected: fatal error %d\n",
2954                                        libcfs_nid2str(peer->ibp_nid), why);
2955
2956                                 if (why == IBNAL_REJECT_CONN_RACE)
2957                                         kibnal_reconnect(conn, -EALREADY);
2958                                 else
2959                                         kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2960                                 return;
2961                         }
2962
2963                         /* Fail unless it's worth retrying with an old proto
2964                          * version */
2965                         if (!(magic == IBNAL_MSG_MAGIC &&
2966                               version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
2967                               conn->ibc_version == IBNAL_MSG_VERSION)) {
2968                                 CERROR("conn -> %s rejected: bad protocol "
2969                                        "magic/ver %08x/%x why %d\n",
2970                                        libcfs_nid2str(peer->ibp_nid),
2971                                        magic, version, why);
2972
2973                                 kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2974                                 return;
2975                         }
2976
2977                         conn->ibc_version = version;
2978                         CWARN ("Connection to %s refused: "
2979                                "retrying with old protocol version 0x%x\n",
2980                                libcfs_nid2str(peer->ibp_nid), version);
2981
2982                         kibnal_reconnect(conn, -ECONNREFUSED);
2983                         return;
2984                 } else if (cv->cv_conndata.data.reject.reason ==
2985                            cm_rej_code_stale_conn) {
2986
2987                         CWARN ("conn -> %s stale: retrying\n",
2988                                libcfs_nid2str(peer->ibp_nid));
2989
2990                         kibnal_reconnect(conn, -ESTALE);
2991                         return;
2992                 } else {
2993                         CDEBUG(D_NETERROR, "conn -> %s rejected: reason %d\n",
2994                                libcfs_nid2str(peer->ibp_nid),
2995                                cv->cv_conndata.data.reject.reason);
2996                         kibnal_connreq_done(conn, 1, -ECONNREFUSED);
2997                         return;
2998                 }
2999                 /* NOT REACHED */
3000         }
3001
3002         CDEBUG(D_NETERROR, "conn -> %s failed: %d\n",
3003                libcfs_nid2str(peer->ibp_nid), cv->cv_conndata.status);
3004         kibnal_connreq_done(conn, 1, -ECONNABORTED);
3005 }
3006
3007 void
3008 kibnal_arp_done (kib_conn_t *conn)
3009 {
3010         kib_peer_t           *peer = conn->ibc_peer;
3011         kib_connvars_t       *cv = conn->ibc_connvars;
3012         ibat_arp_data_t      *arp = &cv->cv_arp;
3013         ib_path_record_v2_t  *path = &cv->cv_path;
3014         vv_return_t           vvrc;
3015         int                   rc;
3016         unsigned long         flags;
3017
3018         LASSERT (!in_interrupt());
3019         LASSERT (current == kibnal_data.kib_connd);
3020         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
3021         LASSERT (peer->ibp_arp_count > 0);
3022
3023         if (cv->cv_arprc != ibat_stat_ok) {
3024                 CDEBUG(D_NETERROR, "Arp %s @ %u.%u.%u.%u failed: %d\n",
3025                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip),
3026                        cv->cv_arprc);
3027                 goto failed;
3028         }
3029
3030         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
3031                 CDEBUG(D_NET, "Got valid path for %s\n",
3032                        libcfs_nid2str(peer->ibp_nid));
3033
3034                 *path = *arp->primary_path;
3035
3036                 vvrc = base_gid2port_num(kibnal_data.kib_hca, &path->sgid,
3037                                          &cv->cv_port);
3038                 if (vvrc != vv_return_ok) {
3039                         CWARN("base_gid2port_num failed for %s @ %u.%u.%u.%u: %d\n",
3040                               libcfs_nid2str(peer->ibp_nid),
3041                               HIPQUAD(peer->ibp_ip), vvrc);
3042                         goto failed;
3043                 }
3044
3045                 vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
3046                                      &path->sgid, &cv->cv_sgid_index);
3047                 if (vvrc != vv_return_ok) {
3048                         CWARN("gid2gid_index failed for %s @ %u.%u.%u.%u: %d\n",
3049                               libcfs_nid2str(peer->ibp_nid),
3050                               HIPQUAD(peer->ibp_ip), vvrc);
3051                         goto failed;
3052                 }
3053
3054                 vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
3055                                        path->pkey, &cv->cv_pkey_index);
3056                 if (vvrc != vv_return_ok) {
3057                         CWARN("pkey2pkey_index failed for %s @ %u.%u.%u.%u: %d\n",
3058                               libcfs_nid2str(peer->ibp_nid),
3059                               HIPQUAD(peer->ibp_ip), vvrc);
3060                         goto failed;
3061                 }
3062
3063                 path->mtu = IBNAL_IB_MTU;
3064
3065         } else if ((arp->mask & IBAT_LID_VALID) != 0) {
3066                 CWARN("Creating new path record for %s @ %u.%u.%u.%u\n",
3067                       libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip));
3068
3069                 cv->cv_pkey_index = IBNAL_PKEY_IDX;
3070                 cv->cv_sgid_index = IBNAL_SGID_IDX;
3071                 cv->cv_port = arp->local_port_num;
3072
3073                 memset(path, 0, sizeof(*path));
3074
3075                 vvrc = port_num2base_gid(kibnal_data.kib_hca, cv->cv_port,
3076                                          &path->sgid);
3077                 if (vvrc != vv_return_ok) {
3078                         CWARN("port_num2base_gid failed for %s @ %u.%u.%u.%u: %d\n",
3079                               libcfs_nid2str(peer->ibp_ip),
3080                               HIPQUAD(peer->ibp_ip), vvrc);
3081                         goto failed;
3082                 }
3083
3084                 vvrc = port_num2base_lid(kibnal_data.kib_hca, cv->cv_port,
3085                                          &path->slid);
3086                 if (vvrc != vv_return_ok) {
3087                         CWARN("port_num2base_lid failed for %s @ %u.%u.%u.%u: %d\n",
3088                               libcfs_nid2str(peer->ibp_ip),
3089                               HIPQUAD(peer->ibp_ip), vvrc);
3090                         goto failed;
3091                 }
3092
3093                 path->dgid          = arp->gid;
3094                 path->sl            = IBNAL_SERVICE_LEVEL;
3095                 path->dlid          = arp->lid;
3096                 path->mtu           = IBNAL_IB_MTU;
3097                 path->rate          = IBNAL_STATIC_RATE;
3098                 path->pkt_life_time = IBNAL_PKT_LIFETIME;
3099                 path->pkey          = IBNAL_PKEY;
3100                 path->traffic_class = IBNAL_TRAFFIC_CLASS;
3101         } else {
3102                 CWARN("Arp for %s @ %u.%u.%u.%u returned neither PATH nor LID\n",
3103                       libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip));
3104                 goto failed;
3105         }
3106
3107         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
3108         if (rc != 0) {
3109                 kibnal_connreq_done(conn, 1, rc);
3110         }
3111
3112         /* do the actual connection request */
3113         kibnal_connect_conn(conn);
3114         return;
3115
3116  failed:
3117         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
3118         peer->ibp_arp_count--;
3119         if (peer->ibp_arp_count == 0) {
3120                 /* final ARP attempt failed */
3121                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
3122                                         flags);
3123                 CDEBUG(D_NETERROR, "Arp %s @ %u.%u.%u.%u failed (final attempt)\n",
3124                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip));
3125         } else {
3126                 /* Retry ARP: ibp_connecting++ so terminating conn
3127                  * doesn't end peer's connection attempt */
3128                 peer->ibp_connecting++;
3129                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
3130                                         flags);
3131                 CDEBUG(D_NETERROR, "Arp %s @ %u.%u.%u.%u failed (%d attempts left)\n",
3132                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip),
3133                        peer->ibp_arp_count);
3134
3135                 kibnal_schedule_peer_arp(peer);
3136         }
3137         kibnal_connreq_done(conn, 1, -ENETUNREACH);
3138 }
3139
3140 void
3141 kibnal_arp_callback (ibat_stat_t arprc, ibat_arp_data_t *arp_data, void *arg)
3142 {
3143         /* CAVEAT EMPTOR: tasklet context */
3144         kib_peer_t *peer;
3145         kib_conn_t *conn = (kib_conn_t *)arg;
3146
3147         LASSERT (conn != NULL);
3148         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
3149
3150         peer = conn->ibc_peer;
3151
3152         if (arprc != ibat_stat_ok)
3153                 CDEBUG(D_NETERROR, "Arp %s at %u.%u.%u.%u failed: %d\n",
3154                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), arprc);
3155         else
3156                 CDEBUG(D_NET, "Arp %s at %u.%u.%u.%u OK: LID %s PATH %s\n",
3157                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip),
3158                        (arp_data->mask & IBAT_LID_VALID) == 0 ? "invalid" : "valid",
3159                        (arp_data->mask & IBAT_PRI_PATH_VALID) == 0 ? "invalid" : "valid");
3160
3161         conn->ibc_connvars->cv_arprc = arprc;
3162         if (arprc == ibat_stat_ok)
3163                 conn->ibc_connvars->cv_arp = *arp_data;
3164
3165         kibnal_schedule_conn(conn);
3166         kibnal_conn_decref(conn);
3167 }
3168
3169 void
3170 kibnal_arp_peer (kib_peer_t *peer)
3171 {
3172         cm_cep_handle_t  cep;
3173         kib_conn_t      *conn;
3174         int              ibatrc;
3175
3176         /* Only the connd does this (i.e. single threaded) */
3177         LASSERT (current == kibnal_data.kib_connd);
3178         LASSERT (peer->ibp_connecting != 0);
3179         LASSERT (peer->ibp_arp_count > 0);
3180
3181         cep = cm_create_cep(cm_cep_transp_rc);
3182         if (cep == NULL) {
3183                 CERROR ("Can't create cep for conn->%s\n",
3184                         libcfs_nid2str(peer->ibp_nid));
3185                 kibnal_peer_connect_failed(peer, 1, -ENOMEM);
3186                 return;
3187         }
3188
3189         conn = kibnal_create_conn(cep);
3190         if (conn == NULL) {
3191                 CERROR ("Can't allocate conn->%s\n",
3192                         libcfs_nid2str(peer->ibp_nid));
3193                 cm_destroy_cep(cep);
3194                 kibnal_peer_connect_failed(peer, 1, -ENOMEM);
3195                 return;
3196         }
3197
3198         conn->ibc_peer = peer;
3199         kibnal_peer_addref(peer);
3200
3201         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
3202
3203         ibatrc = ibat_get_ib_data(htonl(peer->ibp_ip), INADDR_ANY,
3204                                   ibat_paths_primary,
3205                                   &conn->ibc_connvars->cv_arp,
3206                                   kibnal_arp_callback, conn, 0);
3207         CDEBUG(D_NET,"ibatrc %d\n", ibatrc);
3208         switch (ibatrc) {
3209         default:
3210                 LBUG();
3211
3212         case ibat_stat_pending:
3213                 /* NB callback has my ref on conn */
3214                 break;
3215
3216         case ibat_stat_ok:
3217         case ibat_stat_error:
3218         case ibat_stat_timeout:
3219         case ibat_stat_not_found:
3220                 /* Immediate return (ARP cache hit or failure) == no callback. 
3221                  * Do the next stage directly... */
3222                 conn->ibc_connvars->cv_arprc = ibatrc;
3223                 kibnal_arp_done(conn);
3224                 kibnal_conn_decref(conn);
3225                 break;
3226         }
3227 }
3228
3229 int
3230 kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
3231 {
3232         kib_tx_t          *tx;
3233         struct list_head  *ttmp;
3234         int                timed_out = 0;
3235
3236         spin_lock(&conn->ibc_lock);
3237
3238         list_for_each (ttmp, txs) {
3239                 tx = list_entry (ttmp, kib_tx_t, tx_list);
3240
3241                 if (txs == &conn->ibc_active_txs) {
3242                         LASSERT (!tx->tx_queued);
3243                         LASSERT (tx->tx_waiting || tx->tx_sending != 0);
3244                 } else {
3245                         LASSERT (tx->tx_queued);
3246                 }
3247
3248                 if (time_after_eq (jiffies, tx->tx_deadline)) {
3249                         timed_out = 1;
3250                         break;
3251                 }
3252         }
3253
3254         spin_unlock(&conn->ibc_lock);
3255         return timed_out;
3256 }
3257
3258 int
3259 kibnal_conn_timed_out (kib_conn_t *conn)
3260 {
3261         return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
3262                 kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
3263                 kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
3264                 kibnal_check_txs(conn, &conn->ibc_active_txs);
3265 }
3266
3267 void
3268 kibnal_check_conns (int idx)
3269 {
3270         struct list_head  *peers = &kibnal_data.kib_peers[idx];
3271         struct list_head  *ptmp;
3272         kib_peer_t        *peer;
3273         kib_conn_t        *conn;
3274         struct list_head  *ctmp;
3275         unsigned long      flags;
3276
3277  again:
3278         /* NB. We expect to have a look at all the peers and not find any
3279          * rdmas to time out, so we just use a shared lock while we
3280          * take a look... */
3281         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
3282
3283         list_for_each (ptmp, peers) {
3284                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
3285
3286                 list_for_each (ctmp, &peer->ibp_conns) {
3287                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
3288
3289                         LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
3290
3291                         /* In case we have enough credits to return via a
3292                          * NOOP, but there were no non-blocking tx descs
3293                          * free to do it last time... */
3294                         kibnal_check_sends(conn);
3295
3296                         if (!kibnal_conn_timed_out(conn))
3297                                 continue;
3298
3299                         /* Handle timeout by closing the whole connection.  We
3300                          * can only be sure RDMA activity has ceased once the
3301                          * QP has been modified. */
3302
3303                         kibnal_conn_addref(conn); /* 1 ref for me... */
3304
3305                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
3306                                                flags);
3307
3308                         CERROR("Timed out RDMA with %s\n",
3309                                libcfs_nid2str(peer->ibp_nid));
3310
3311                         kibnal_close_conn (conn, -ETIMEDOUT);
3312                         kibnal_conn_decref(conn); /* ...until here */
3313
3314                         /* start again now I've dropped the lock */
3315                         goto again;
3316                 }
3317         }
3318
3319         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
3320 }
3321
3322 void
3323 kibnal_disconnect_conn (kib_conn_t *conn)
3324 {
3325         static cm_drequest_data_t dreq;         /* just for the space */
3326
3327         cm_return_t    cmrc;
3328         unsigned long  flags;
3329
3330         LASSERT (!in_interrupt());
3331         LASSERT (current == kibnal_data.kib_connd);
3332
3333         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
3334
3335         if (conn->ibc_disconnect) {
3336                 /* Had the CM callback already */
3337                 write_unlock_irqrestore(&kibnal_data.kib_global_lock,
3338                                         flags);
3339                 kibnal_conn_disconnected(conn);
3340                 return;
3341         }
3342
3343         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
3344
3345         /* active disconnect */
3346         cmrc = cm_disconnect(conn->ibc_cep, &dreq, NULL);
3347         if (cmrc == cm_stat_success) {
3348                 /* waiting for CM */
3349                 conn->ibc_state = IBNAL_CONN_DISCONNECT2;
3350                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
3351                 return;
3352         }
3353
3354         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
3355
3356         cm_cancel(conn->ibc_cep);
3357         cfs_pause(cfs_time_seconds(1)/10);
3358
3359         if (!conn->ibc_disconnect)              /* CM callback will never happen now */
3360                 kibnal_conn_decref(conn);
3361
3362         LASSERT (atomic_read(&conn->ibc_refcount) > 0);
3363         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECT1);
3364
3365         kibnal_conn_disconnected(conn);
3366 }
3367
3368 int
3369 kibnal_connd (void *arg)
3370 {
3371         wait_queue_t       wait;
3372         unsigned long      flags;
3373         kib_pcreq_t       *pcr;
3374         kib_conn_t        *conn;
3375         kib_peer_t        *peer;
3376         int                timeout;
3377         int                i;
3378         int                dropped_lock;
3379         int                peer_index = 0;
3380         unsigned long      deadline = jiffies;
3381
3382         cfs_daemonize ("kibnal_connd");
3383         cfs_block_allsigs ();
3384
3385         init_waitqueue_entry (&wait, current);
3386         kibnal_data.kib_connd = current;
3387
3388         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3389
3390         while (!kibnal_data.kib_shutdown) {
3391
3392                 dropped_lock = 0;
3393
3394                 if (!list_empty (&kibnal_data.kib_connd_zombies)) {
3395                         conn = list_entry (kibnal_data.kib_connd_zombies.next,
3396                                            kib_conn_t, ibc_list);
3397                         list_del (&conn->ibc_list);
3398
3399                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3400                         dropped_lock = 1;
3401
3402                         kibnal_destroy_conn(conn);
3403
3404                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3405                 }
3406
3407                 if (!list_empty (&kibnal_data.kib_connd_pcreqs)) {
3408                         pcr = list_entry(kibnal_data.kib_connd_pcreqs.next,
3409                                          kib_pcreq_t, pcr_list);
3410                         list_del(&pcr->pcr_list);
3411
3412                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3413                         dropped_lock = 1;
3414
3415                         kibnal_recv_connreq(pcr->pcr_cep, &pcr->pcr_cmreq);
3416                         LIBCFS_FREE(pcr, sizeof(*pcr));
3417
3418                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3419                 }
3420
3421                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
3422                         peer = list_entry (kibnal_data.kib_connd_peers.next,
3423                                            kib_peer_t, ibp_connd_list);
3424
3425                         list_del_init (&peer->ibp_connd_list);
3426                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3427                         dropped_lock = 1;
3428
3429                         kibnal_arp_peer (peer);
3430                         kibnal_peer_decref (peer);
3431
3432                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3433                 }
3434
3435                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
3436                         conn = list_entry (kibnal_data.kib_connd_conns.next,
3437                                            kib_conn_t, ibc_list);
3438                         list_del (&conn->ibc_list);
3439
3440                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3441                         dropped_lock = 1;
3442
3443                         switch (conn->ibc_state) {
3444                         default:
3445                                 LBUG();
3446
3447                         case IBNAL_CONN_ACTIVE_ARP:
3448                                 kibnal_arp_done(conn);
3449                                 break;
3450
3451                         case IBNAL_CONN_ACTIVE_CONNECT:
3452                                 kibnal_check_connreply(conn);
3453                                 break;
3454
3455                         case IBNAL_CONN_PASSIVE_WAIT:
3456                                 kibnal_check_passive_wait(conn);
3457                                 break;
3458
3459                         case IBNAL_CONN_DISCONNECT1:
3460                         case IBNAL_CONN_DISCONNECT2:
3461                                 kibnal_disconnect_conn(conn);
3462                                 break;
3463                         }
3464                         kibnal_conn_decref(conn);
3465
3466                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3467                 }
3468
3469                 /* careful with the jiffy wrap... */
3470                 timeout = (int)(deadline - jiffies);
3471                 if (timeout <= 0) {
3472                         const int n = 4;
3473                         const int p = 1;
3474                         int       chunk = kibnal_data.kib_peer_hash_size;
3475
3476                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
3477                         dropped_lock = 1;
3478
3479                         /* Time to check for RDMA timeouts on a few more
3480                          * peers: I do checks every 'p' seconds on a
3481                          * proportion of the peer table and I need to check
3482                          * every connection 'n' times within a timeout
3483                          * interval, to ensure I detect a timeout on any
3484                          * connection within (n+1)/n times the timeout
3485                          * interval. */
3486
3487                         if (*kibnal_tunables.kib_timeout > n * p)
3488                                 chunk = (chunk * n * p) /
3489                                         *kibnal_tunables.kib_timeout;
3490                         if (chunk == 0)
3491                                 chunk = 1;
3492
3493                         for (i = 0; i < chunk; i++) {
3494                                 kibnal_check_conns (peer_index);
3495                                 peer_index = (peer_index + 1) %
3496                                              kibnal_data.kib_peer_hash_size;
3497                         }
3498
3499                         deadline += p * HZ;
3500                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
3501                 }
3502
3503                 if (dropped_lock)
3504                         continue;
3505
3506                 /* Nothing to do for 'timeout'  */
3507                 set_current_state (TASK_INTERRUPTIBLE);
3508                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3509                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3510
3511                 schedule_timeout (timeout);
3512
3513                 set_current_state (TASK_RUNNING);
3514                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
3515                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
3516         }
3517
3518         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
3519
3520         kibnal_thread_fini ();
3521         return (0);
3522 }
3523
3524 void
3525 kibnal_async_callback(vv_event_record_t ev)
3526 {
3527         CERROR("type: %d, port: %d, data: "LPX64"\n",
3528                ev.event_type, ev.port_num, ev.type.data);
3529 }
3530
3531 void
3532 kibnal_cq_callback (unsigned long unused_context)
3533 {
3534         unsigned long    flags;
3535
3536         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3537         kibnal_data.kib_ready = 1;
3538         wake_up(&kibnal_data.kib_sched_waitq);
3539         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3540 }
3541
3542 int
3543 kibnal_scheduler(void *arg)
3544 {
3545         long            id = (long)arg;
3546         wait_queue_t    wait;
3547         char            name[16];
3548         vv_wc_t         wc;
3549         vv_return_t     vvrc;
3550         vv_return_t     vvrc2;
3551         unsigned long   flags;
3552         kib_rx_t       *rx;
3553         __u64           rxseq = 0;
3554         int             busy_loops = 0;
3555
3556         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
3557         cfs_daemonize(name);
3558         cfs_block_allsigs();
3559
3560         init_waitqueue_entry(&wait, current);
3561
3562         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3563
3564         while (!kibnal_data.kib_shutdown) {
3565                 if (busy_loops++ >= IBNAL_RESCHED) {
3566                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3567                                                flags);
3568
3569                         our_cond_resched();
3570                         busy_loops = 0;
3571
3572                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3573                 }
3574
3575                 if (kibnal_data.kib_ready &&
3576                     !kibnal_data.kib_checking_cq) {
3577                         /* take ownership of completion polling */
3578                         kibnal_data.kib_checking_cq = 1;
3579                         /* Assume I'll exhaust the CQ */
3580                         kibnal_data.kib_ready = 0;
3581                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3582                                                flags);
3583
3584                         vvrc = vv_poll_for_completion(kibnal_data.kib_hca,
3585                                                       kibnal_data.kib_cq, &wc);
3586                         if (vvrc == vv_return_err_cq_empty) {
3587                                 vvrc2 = vv_request_completion_notification(
3588                                         kibnal_data.kib_hca,
3589                                         kibnal_data.kib_cq,
3590                                         vv_next_solicit_unsolicit_event);
3591                                 LASSERT (vvrc2 == vv_return_ok);
3592                         }
3593
3594                         if (vvrc == vv_return_ok &&
3595                             kibnal_wreqid2type(wc.wr_id) == IBNAL_WID_RX) {
3596                                 rx = (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id);
3597
3598                                 /* Grab the RX sequence number NOW before
3599                                  * anyone else can get an RX completion */
3600                                 rxseq = rx->rx_conn->ibc_rxseq++;
3601                         }
3602
3603                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3604                         /* give up ownership of completion polling */
3605                         kibnal_data.kib_checking_cq = 0;
3606
3607                         if (vvrc == vv_return_err_cq_empty)
3608                                 continue;
3609
3610                         LASSERT (vvrc == vv_return_ok);
3611                         /* Assume there's more: get another scheduler to check
3612                          * while I handle this completion... */
3613
3614                         kibnal_data.kib_ready = 1;
3615                         wake_up(&kibnal_data.kib_sched_waitq);
3616
3617                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3618                                                flags);
3619
3620                         switch (kibnal_wreqid2type(wc.wr_id)) {
3621                         case IBNAL_WID_RX:
3622                                 kibnal_rx_complete(
3623                                         (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id),
3624                                         wc.completion_status,
3625                                         wc.num_bytes_transfered,
3626                                         rxseq);
3627                                 break;
3628
3629                         case IBNAL_WID_TX:
3630                                 kibnal_tx_complete(
3631                                         (kib_tx_t *)kibnal_wreqid2ptr(wc.wr_id),
3632                                         wc.completion_status);
3633                                 break;
3634
3635                         case IBNAL_WID_RDMA:
3636                                 /* We only get RDMA completion notification if
3637                                  * it fails.  So we just ignore them completely
3638                                  * because...
3639                                  *
3640                                  * 1) If an RDMA fails, all subsequent work
3641                                  * items, including the final SEND will fail
3642                                  * too, so I'm still guaranteed to notice that
3643                                  * this connection is hosed.
3644                                  *
3645                                  * 2) It's positively dangerous to look inside
3646                                  * the tx descriptor obtained from an RDMA work
3647                                  * item.  As soon as I drop the kib_sched_lock,
3648                                  * I give a scheduler on another CPU a chance
3649                                  * to get the final SEND completion, so the tx
3650                                  * descriptor can get freed as I inspect it. */
3651                                 CDEBUG(D_NETERROR, "RDMA failed: %d\n",
3652                                        wc.completion_status);
3653                                 break;
3654
3655                         default:
3656                                 LBUG();
3657                         }
3658
3659                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3660                         continue;
3661                 }
3662
3663                 /* Nothing to do; sleep... */
3664
3665                 set_current_state(TASK_INTERRUPTIBLE);
3666                 add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
3667                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
3668                                        flags);
3669
3670                 schedule();
3671
3672                 remove_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
3673                 set_current_state(TASK_RUNNING);
3674                 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
3675         }
3676
3677         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3678
3679         kibnal_thread_fini();
3680         return (0);
3681 }