1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "openibnal.h"
27 * LIB functions follow
31 koibnal_schedule_tx_done (koib_tx_t *tx)
35 spin_lock_irqsave (&koibnal_data.koib_sched_lock, flags);
37 list_add_tail(&tx->tx_list, &koibnal_data.koib_sched_txq);
38 wake_up (&koibnal_data.koib_sched_waitq);
40 spin_unlock_irqrestore(&koibnal_data.koib_sched_lock, flags);
44 koibnal_tx_done (koib_tx_t *tx)
46 ptl_err_t ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
51 LASSERT (tx->tx_sending == 0); /* mustn't be awaiting callback */
52 LASSERT (!tx->tx_passive_rdma_wait); /* mustn't be on ibc_rdma_queue */
54 switch (tx->tx_mapped) {
58 case KOIB_TX_UNMAPPED:
63 /* can't deregister memory in IRQ context... */
64 koibnal_schedule_tx_done(tx);
67 rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
69 tx->tx_mapped = KOIB_TX_UNMAPPED;
73 case KOIB_TX_MAPPED_FMR:
74 if (in_interrupt() && tx->tx_status != 0) {
75 /* can't flush FMRs in IRQ context... */
76 koibnal_schedule_tx_done(tx);
80 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
83 if (tx->tx_status != 0)
84 ib_fmr_pool_force_flush(koibnal_data.koib_fmr_pool);
85 tx->tx_mapped = KOIB_TX_UNMAPPED;
90 for (i = 0; i < 2; i++) {
91 /* tx may have up to 2 libmsgs to finalise */
92 if (tx->tx_libmsg[i] == NULL)
95 lib_finalize (&koibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96 tx->tx_libmsg[i] = NULL;
99 if (tx->tx_conn != NULL) {
100 koibnal_put_conn (tx->tx_conn);
105 tx->tx_passive_rdma = 0;
108 spin_lock_irqsave (&koibnal_data.koib_tx_lock, flags);
111 list_add_tail (&tx->tx_list, &koibnal_data.koib_idle_nblk_txs);
113 list_add_tail (&tx->tx_list, &koibnal_data.koib_idle_txs);
114 wake_up (&koibnal_data.koib_idle_tx_waitq);
117 spin_unlock_irqrestore (&koibnal_data.koib_tx_lock, flags);
121 koibnal_get_idle_tx (int may_block)
124 koib_tx_t *tx = NULL;
127 spin_lock_irqsave (&koibnal_data.koib_tx_lock, flags);
129 /* "normal" descriptor is free */
130 if (!list_empty (&koibnal_data.koib_idle_txs)) {
131 tx = list_entry (koibnal_data.koib_idle_txs.next,
137 /* may dip into reserve pool */
138 if (list_empty (&koibnal_data.koib_idle_nblk_txs)) {
139 CERROR ("reserved tx desc pool exhausted\n");
143 tx = list_entry (koibnal_data.koib_idle_nblk_txs.next,
148 /* block for idle tx */
149 spin_unlock_irqrestore (&koibnal_data.koib_tx_lock, flags);
151 wait_event (koibnal_data.koib_idle_tx_waitq,
152 !list_empty (&koibnal_data.koib_idle_txs) ||
153 koibnal_data.koib_shutdown);
157 list_del (&tx->tx_list);
159 /* Allocate a new passive RDMA completion cookie. It might
160 * not be needed, but we've got a lock right now and we're
161 * unlikely to wrap... */
162 tx->tx_passive_rdma_cookie = koibnal_data.koib_next_tx_cookie++;
164 LASSERT (tx->tx_mapped == KOIB_TX_UNMAPPED);
165 LASSERT (tx->tx_nsp == 0);
166 LASSERT (tx->tx_sending == 0);
167 LASSERT (tx->tx_status == 0);
168 LASSERT (tx->tx_conn == NULL);
169 LASSERT (!tx->tx_passive_rdma);
170 LASSERT (!tx->tx_passive_rdma_wait);
171 LASSERT (tx->tx_libmsg[0] == NULL);
172 LASSERT (tx->tx_libmsg[1] == NULL);
175 spin_unlock_irqrestore (&koibnal_data.koib_tx_lock, flags);
181 koibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
183 /* I would guess that if koibnal_get_peer (nid) == NULL,
184 and we're not routing, then 'nid' is very distant :) */
185 if ( nal->libnal_ni.ni_pid.nid == nid ) {
195 koibnal_complete_passive_rdma(koib_conn_t *conn, __u64 cookie, int status)
197 struct list_head *ttmp;
201 spin_lock_irqsave (&conn->ibc_lock, flags);
203 list_for_each (ttmp, &conn->ibc_rdma_queue) {
204 koib_tx_t *tx = list_entry(ttmp, koib_tx_t, tx_list);
206 LASSERT (tx->tx_passive_rdma);
207 LASSERT (tx->tx_passive_rdma_wait);
209 if (tx->tx_passive_rdma_cookie != cookie)
212 CDEBUG(D_NET, "Complete %p "LPD64"\n", tx, cookie);
214 list_del (&tx->tx_list);
216 tx->tx_passive_rdma_wait = 0;
217 idle = (tx->tx_sending == 0);
219 tx->tx_status = status;
221 spin_unlock_irqrestore (&conn->ibc_lock, flags);
223 /* I could be racing with tx callbacks. It's whoever
224 * _makes_ tx idle that frees it */
226 koibnal_tx_done (tx);
230 spin_unlock_irqrestore (&conn->ibc_lock, flags);
232 CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
233 cookie, conn->ibc_peer->ibp_nid);
237 koibnal_post_rx (koib_rx_t *rx, int do_credits)
239 koib_conn_t *conn = rx->rx_conn;
243 rx->rx_gl = (struct ib_gather_scatter) {
244 .address = rx->rx_vaddr,
245 .length = OPENIBNAL_MSG_SIZE,
246 .key = conn->ibc_rx_pages->oibp_lkey,
249 rx->rx_sp = (struct ib_receive_param) {
250 .work_request_id = (__u64)(unsigned long)rx,
251 .scatter_list = &rx->rx_gl,
252 .num_scatter_entries = 1,
253 .device_specific = NULL,
257 LASSERT (conn->ibc_state >= OPENIBNAL_CONN_ESTABLISHED);
258 LASSERT (!rx->rx_posted);
262 if (conn->ibc_state != OPENIBNAL_CONN_ESTABLISHED)
265 rc = ib_receive (conn->ibc_qp, &rx->rx_sp, 1);
269 spin_lock_irqsave(&conn->ibc_lock, flags);
270 conn->ibc_outstanding_credits++;
271 spin_unlock_irqrestore(&conn->ibc_lock, flags);
273 koibnal_check_sends(conn);
278 if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED) {
279 CERROR ("Error posting receive -> "LPX64": %d\n",
280 conn->ibc_peer->ibp_nid, rc);
281 koibnal_close_conn (rx->rx_conn, rc);
283 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
284 conn->ibc_peer->ibp_nid, rc);
288 koibnal_put_conn (conn);
292 __u32 koibnal_cksum (void *ptr, int nob)
298 sum = ((sum << 1) | (sum >> 31)) + *c++;
305 koibnal_rx_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
307 koib_rx_t *rx = (koib_rx_t *)((unsigned long)e->work_request_id);
308 koib_msg_t *msg = rx->rx_msg;
309 koib_conn_t *conn = rx->rx_conn;
310 int nob = e->bytes_transferred;
311 const int base_nob = offsetof(koib_msg_t, oibm_u);
317 __u32 computed_cksum;
320 CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
321 LASSERT (rx->rx_posted);
325 /* receives complete with error in any case after we've started
327 if (conn->ibc_state >= OPENIBNAL_CONN_DEATHROW)
330 /* We don't post receives until the conn is established */
331 LASSERT (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED);
333 if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
334 CERROR("Rx from "LPX64" failed: %d\n",
335 conn->ibc_peer->ibp_nid, e->status);
339 if (nob < base_nob) {
340 CERROR ("Short rx from "LPX64": %d\n",
341 conn->ibc_peer->ibp_nid, nob);
345 /* Receiver does any byte flipping if necessary... */
347 if (msg->oibm_magic == OPENIBNAL_MSG_MAGIC) {
350 if (msg->oibm_magic != __swab32(OPENIBNAL_MSG_MAGIC)) {
351 CERROR ("Unrecognised magic: %08x from "LPX64"\n",
352 msg->oibm_magic, conn->ibc_peer->ibp_nid);
356 __swab16s (&msg->oibm_version);
357 LASSERT (sizeof(msg->oibm_type) == 1);
358 LASSERT (sizeof(msg->oibm_credits) == 1);
361 if (msg->oibm_version != OPENIBNAL_MSG_VERSION) {
362 CERROR ("Incompatible msg version %d (%d expected)\n",
363 msg->oibm_version, OPENIBNAL_MSG_VERSION);
368 if (nob != msg->oibm_nob) {
369 CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->oibm_nob);
373 msg_cksum = le32_to_cpu(msg->oibm_cksum);
375 computed_cksum = koibnal_cksum (msg, nob);
377 if (msg_cksum != computed_cksum) {
378 CERROR ("Checksum failure %d: (%d expected)\n",
379 computed_cksum, msg_cksum);
382 CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob);
385 /* Have I received credits that will let me send? */
386 credits = msg->oibm_credits;
388 spin_lock_irqsave(&conn->ibc_lock, flags);
389 conn->ibc_credits += credits;
390 spin_unlock_irqrestore(&conn->ibc_lock, flags);
392 koibnal_check_sends(conn);
395 switch (msg->oibm_type) {
396 case OPENIBNAL_MSG_NOOP:
397 koibnal_post_rx (rx, 1);
400 case OPENIBNAL_MSG_IMMEDIATE:
401 if (nob < base_nob + sizeof (koib_immediate_msg_t)) {
402 CERROR ("Short IMMEDIATE from "LPX64": %d\n",
403 conn->ibc_peer->ibp_nid, nob);
408 case OPENIBNAL_MSG_PUT_RDMA:
409 case OPENIBNAL_MSG_GET_RDMA:
410 if (nob < base_nob + sizeof (koib_rdma_msg_t)) {
411 CERROR ("Short RDMA msg from "LPX64": %d\n",
412 conn->ibc_peer->ibp_nid, nob);
416 __swab32s(&msg->oibm_u.rdma.oibrm_desc.rd_key);
417 __swab32s(&msg->oibm_u.rdma.oibrm_desc.rd_nob);
418 __swab64s(&msg->oibm_u.rdma.oibrm_desc.rd_addr);
420 CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
421 msg->oibm_type, msg->oibm_u.rdma.oibrm_cookie,
422 msg->oibm_u.rdma.oibrm_desc.rd_key,
423 msg->oibm_u.rdma.oibrm_desc.rd_addr,
424 msg->oibm_u.rdma.oibrm_desc.rd_nob);
427 case OPENIBNAL_MSG_PUT_DONE:
428 case OPENIBNAL_MSG_GET_DONE:
429 if (nob < base_nob + sizeof (koib_completion_msg_t)) {
430 CERROR ("Short COMPLETION msg from "LPX64": %d\n",
431 conn->ibc_peer->ibp_nid, nob);
435 __swab32s(&msg->oibm_u.completion.oibcm_status);
437 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
438 msg->oibm_type, msg->oibm_u.completion.oibcm_cookie,
439 msg->oibm_u.completion.oibcm_status);
441 koibnal_complete_passive_rdma (conn,
442 msg->oibm_u.completion.oibcm_cookie,
443 msg->oibm_u.completion.oibcm_status);
444 koibnal_post_rx (rx, 1);
448 CERROR ("Can't parse type from "LPX64": %d\n",
449 conn->ibc_peer->ibp_nid, msg->oibm_type);
453 /* schedule for koibnal_rx() in thread context */
454 spin_lock_irqsave(&koibnal_data.koib_sched_lock, flags);
456 list_add_tail (&rx->rx_list, &koibnal_data.koib_sched_rxq);
457 wake_up (&koibnal_data.koib_sched_waitq);
459 spin_unlock_irqrestore(&koibnal_data.koib_sched_lock, flags);
463 CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
464 koibnal_close_conn(conn, -ECONNABORTED);
466 /* Don't re-post rx & drop its ref on conn */
467 koibnal_put_conn(conn);
471 koibnal_rx (koib_rx_t *rx)
473 koib_msg_t *msg = rx->rx_msg;
475 /* Clear flag so I can detect if I've sent an RDMA completion */
478 switch (msg->oibm_type) {
479 case OPENIBNAL_MSG_GET_RDMA:
480 lib_parse(&koibnal_lib, &msg->oibm_u.rdma.oibrm_hdr, rx);
481 /* If the incoming get was matched, I'll have initiated the
482 * RDMA and the completion message... */
486 /* Otherwise, I'll send a failed completion now to prevent
487 * the peer's GET blocking for the full timeout. */
488 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
489 rx->rx_conn->ibc_peer->ibp_nid);
490 koibnal_start_active_rdma (OPENIBNAL_MSG_GET_DONE, -EIO,
491 rx, NULL, 0, NULL, NULL, 0, 0);
494 case OPENIBNAL_MSG_PUT_RDMA:
495 lib_parse(&koibnal_lib, &msg->oibm_u.rdma.oibrm_hdr, rx);
498 /* This is most unusual, since even if lib_parse() didn't
499 * match anything, it should have asked us to read (and
500 * discard) the payload. The portals header must be
501 * inconsistent with this message type, so it's the
502 * sender's fault for sending garbage and she can time
504 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
505 rx->rx_conn->ibc_peer->ibp_nid);
508 case OPENIBNAL_MSG_IMMEDIATE:
509 lib_parse(&koibnal_lib, &msg->oibm_u.immediate.oibim_hdr, rx);
510 LASSERT (!rx->rx_rdma);
518 koibnal_post_rx (rx, 1);
523 koibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp)
527 if (vaddr >= VMALLOC_START &&
529 page = vmalloc_to_page ((void *)vaddr);
531 else if (vaddr >= PKMAP_BASE &&
532 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
533 page = vmalloc_to_page ((void *)vaddr);
534 /* in 2.4 ^ just walks the page tables */
537 page = virt_to_page (vaddr);
543 *physp = koibnal_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
549 koibnal_map_iov (koib_tx_t *tx, enum ib_memory_access access,
550 int niov, struct iovec *iov, int offset, int nob)
558 LASSERT (tx->tx_mapped == KOIB_TX_UNMAPPED);
560 while (offset >= iov->iov_len) {
561 offset -= iov->iov_len;
567 if (nob > iov->iov_len - offset) {
568 CERROR ("Can't map multiple vaddr fragments\n");
572 vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
573 tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
575 rc = ib_memory_register (koibnal_data.koib_pd,
578 &tx->tx_md.md_handle.mr,
583 CERROR ("Can't map vaddr: %d\n", rc);
587 tx->tx_mapped = KOIB_TX_MAPPED;
592 koibnal_map_kiov (koib_tx_t *tx, enum ib_memory_access access,
593 int nkiov, ptl_kiov_t *kiov,
598 const int mapped = KOIB_TX_MAPPED_FMR;
600 struct ib_physical_buffer *phys;
601 const int mapped = KOIB_TX_MAPPED;
609 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
613 LASSERT (tx->tx_mapped == KOIB_TX_UNMAPPED);
615 while (offset >= kiov->kiov_len) {
616 offset -= kiov->kiov_len;
622 phys_size = nkiov * sizeof (*phys);
623 PORTAL_ALLOC(phys, phys_size);
625 CERROR ("Can't allocate tmp phys\n");
629 page_offset = kiov->kiov_offset + offset;
631 phys[0] = koibnal_page2phys(kiov->kiov_page);
633 phys[0].address = koibnal_page2phys(kiov->kiov_page);
634 phys[0].size = PAGE_SIZE;
637 resid = nob - (kiov->kiov_len - offset);
644 if (kiov->kiov_offset != 0 ||
645 ((resid > PAGE_SIZE) &&
646 kiov->kiov_len < PAGE_SIZE)) {
648 /* Can't have gaps */
649 CERROR ("Can't make payload contiguous in I/O VM:"
650 "page %d, offset %d, len %d \n", nphys,
651 kiov->kiov_offset, kiov->kiov_len);
653 for (i = -nphys; i < nkiov; i++)
655 CERROR("kiov[%d] %p +%d for %d\n",
656 i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
663 if (nphys == PTL_MD_MAX_IOV) {
664 CERROR ("payload too big (%d)\n", nphys);
669 LASSERT (nphys * sizeof (*phys) < phys_size);
671 phys[nphys] = koibnal_page2phys(kiov->kiov_page);
673 phys[nphys].address = koibnal_page2phys(kiov->kiov_page);
674 phys[nphys].size = PAGE_SIZE;
682 CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
683 for (rc = 0; rc < nphys; rc++)
684 CWARN (" [%d] "LPX64" / %d\n", rc, phys[rc].address, phys[rc].size);
686 tx->tx_md.md_addr = OPENIBNAL_RDMA_BASE;
689 rc = ib_fmr_register_physical (koibnal_data.koib_fmr_pool,
693 &tx->tx_md.md_handle.fmr,
697 rc = ib_memory_register_physical (koibnal_data.koib_pd,
702 &tx->tx_md.md_handle.mr,
707 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
708 nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
709 tx->tx_mapped = mapped;
711 CERROR ("Can't map phys: %d\n", rc);
716 PORTAL_FREE(phys, phys_size);
721 koibnal_find_conn_locked (koib_peer_t *peer)
723 struct list_head *tmp;
725 /* just return the first connection */
726 list_for_each (tmp, &peer->ibp_conns) {
727 return (list_entry(tmp, koib_conn_t, ibc_list));
734 koibnal_check_sends (koib_conn_t *conn)
743 spin_lock_irqsave (&conn->ibc_lock, flags);
745 if (list_empty(&conn->ibc_tx_queue) &&
746 conn->ibc_outstanding_credits >= OPENIBNAL_CREDIT_HIGHWATER) {
747 spin_unlock_irqrestore(&conn->ibc_lock, flags);
749 tx = koibnal_get_idle_tx(0); /* don't block */
751 koibnal_init_tx_msg(tx, OPENIBNAL_MSG_NOOP, 0);
753 spin_lock_irqsave(&conn->ibc_lock, flags);
756 atomic_inc(&conn->ibc_refcount);
757 koibnal_queue_tx_locked(tx, conn);
761 LASSERT (conn->ibc_nsends_posted <= OPENIBNAL_MSG_QUEUE_SIZE);
763 while (!list_empty (&conn->ibc_tx_queue)) {
764 tx = list_entry (conn->ibc_tx_queue.next, koib_tx_t, tx_list);
766 /* We rely on this for QP sizing */
767 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
769 LASSERT (conn->ibc_outstanding_credits >= 0);
770 LASSERT (conn->ibc_outstanding_credits <= OPENIBNAL_MSG_QUEUE_SIZE);
771 LASSERT (conn->ibc_credits >= 0);
772 LASSERT (conn->ibc_credits <= OPENIBNAL_MSG_QUEUE_SIZE);
774 /* Not on ibc_rdma_queue */
775 LASSERT (!tx->tx_passive_rdma_wait);
777 if (conn->ibc_nsends_posted == OPENIBNAL_MSG_QUEUE_SIZE)
780 if (conn->ibc_credits == 0) /* no credits */
783 if (conn->ibc_credits == 1 && /* last credit reserved for */
784 conn->ibc_outstanding_credits == 0) /* giving back credits */
787 list_del (&tx->tx_list);
789 if (tx->tx_msg->oibm_type == OPENIBNAL_MSG_NOOP &&
790 (!list_empty(&conn->ibc_tx_queue) ||
791 conn->ibc_outstanding_credits < OPENIBNAL_CREDIT_HIGHWATER)) {
793 spin_unlock_irqrestore(&conn->ibc_lock, flags);
795 spin_lock_irqsave(&conn->ibc_lock, flags);
799 /* incoming RDMA completion can find this one now */
800 if (tx->tx_passive_rdma) {
801 list_add (&tx->tx_list, &conn->ibc_rdma_queue);
802 tx->tx_passive_rdma_wait = 1;
803 tx->tx_passive_rdma_deadline =
804 jiffies + koibnal_tunables.koib_io_timeout * HZ;
807 tx->tx_msg->oibm_credits = conn->ibc_outstanding_credits;
808 conn->ibc_outstanding_credits = 0;
810 /* use the free memory barrier when we unlock to ensure
811 * sending set before we can get the tx callback. */
812 conn->ibc_nsends_posted++;
814 tx->tx_sending = tx->tx_nsp;
817 tx->tx_msg->oibm_cksum = 0;
818 tx->tx_msg->oibm_cksum = koibnal_cksum(tx->tx_msg, tx->tx_msg->oibm_nob);
819 CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->oibm_cksum, tx->tx_msg->oibm_nob);
821 spin_unlock_irqrestore (&conn->ibc_lock, flags);
823 /* NB the gap between removing tx from the queue and sending it
824 * allows message re-ordering to occur */
826 LASSERT (tx->tx_nsp > 0);
830 if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED) {
832 /* Driver only accepts 1 item at a time */
833 for (i = 0; i < tx->tx_nsp; i++) {
834 rc = ib_send (conn->ibc_qp, &tx->tx_sp[i], 1);
841 spin_lock_irqsave (&conn->ibc_lock, flags);
843 /* NB credits are transferred in the actual
844 * message, which can only be the last work item */
845 conn->ibc_outstanding_credits += tx->tx_msg->oibm_credits;
847 conn->ibc_nsends_posted--;
848 tx->tx_sending -= tx->tx_nsp - nwork;
850 done = (tx->tx_sending == 0);
852 if (tx->tx_passive_rdma) {
853 tx->tx_passive_rdma_wait = 0;
854 list_del (&tx->tx_list);
857 spin_unlock_irqrestore (&conn->ibc_lock, flags);
859 if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED)
860 CERROR ("Error %d posting transmit to "LPX64"\n",
861 rc, conn->ibc_peer->ibp_nid);
863 CDEBUG (D_NET, "Error %d posting transmit to "
864 LPX64"\n", rc, conn->ibc_peer->ibp_nid);
866 koibnal_close_conn (conn, rc);
869 koibnal_tx_done (tx);
875 spin_unlock_irqrestore (&conn->ibc_lock, flags);
879 koibnal_tx_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
881 koib_tx_t *tx = (koib_tx_t *)((unsigned long)e->work_request_id);
887 LASSERT (conn != NULL);
888 LASSERT (tx->tx_sending != 0);
890 spin_lock_irqsave(&conn->ibc_lock, flags);
892 CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
893 tx->tx_nsp - tx->tx_sending, tx->tx_nsp,
896 /* I could be racing with rdma completion. Whoever makes 'tx' idle
897 * gets to free it, which also drops its ref on 'conn'. If it's
898 * not me, then I take an extra ref on conn so it can't disappear
902 idle = (tx->tx_sending == 0) && /* This is the final callback */
903 (!tx->tx_passive_rdma_wait); /* Not waiting for RDMA completion */
905 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
906 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
907 atomic_read (&conn->ibc_refcount));
908 atomic_inc (&conn->ibc_refcount);
910 if (tx->tx_sending == 0)
911 conn->ibc_nsends_posted--;
913 if (e->status != IB_COMPLETION_STATUS_SUCCESS &&
915 tx->tx_status = -ECONNABORTED;
917 spin_unlock_irqrestore(&conn->ibc_lock, flags);
920 koibnal_tx_done (tx);
922 if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
923 CERROR ("Tx completion to "LPX64" failed: %d\n",
924 conn->ibc_peer->ibp_nid, e->status);
925 koibnal_close_conn (conn, -ENETDOWN);
927 /* can I shovel some more sends out the door? */
928 koibnal_check_sends(conn);
931 koibnal_put_conn (conn);
935 koibnal_init_tx_msg (koib_tx_t *tx, int type, int body_nob)
937 struct ib_gather_scatter *gl = &tx->tx_gl[tx->tx_nsp];
938 struct ib_send_param *sp = &tx->tx_sp[tx->tx_nsp];
940 int nob = offsetof (koib_msg_t, oibm_u) + body_nob;
942 LASSERT (tx->tx_nsp >= 0 &&
943 tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
944 LASSERT (nob <= OPENIBNAL_MSG_SIZE);
946 tx->tx_msg->oibm_magic = OPENIBNAL_MSG_MAGIC;
947 tx->tx_msg->oibm_version = OPENIBNAL_MSG_VERSION;
948 tx->tx_msg->oibm_type = type;
950 tx->tx_msg->oibm_nob = nob;
952 /* Fence the message if it's bundled with an RDMA read */
953 fence = (tx->tx_nsp > 0) &&
954 (type == OPENIBNAL_MSG_PUT_DONE);
956 *gl = (struct ib_gather_scatter) {
957 .address = tx->tx_vaddr,
959 .key = koibnal_data.koib_tx_pages->oibp_lkey,
962 /* NB If this is an RDMA read, the completion message must wait for
963 * the RDMA to complete. Sends wait for previous RDMA writes
965 *sp = (struct ib_send_param) {
966 .work_request_id = (__u64)((unsigned long)tx),
969 .num_gather_entries = 1,
970 .device_specific = NULL,
971 .solicited_event = 1,
973 .immediate_data_valid = 0,
982 koibnal_queue_tx (koib_tx_t *tx, koib_conn_t *conn)
986 spin_lock_irqsave(&conn->ibc_lock, flags);
988 koibnal_queue_tx_locked (tx, conn);
990 spin_unlock_irqrestore(&conn->ibc_lock, flags);
992 koibnal_check_sends(conn);
996 koibnal_launch_tx (koib_tx_t *tx, ptl_nid_t nid)
1001 rwlock_t *g_lock = &koibnal_data.koib_global_lock;
1003 /* If I get here, I've committed to send, so I complete the tx with
1004 * failure on any problems */
1006 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
1007 LASSERT (tx->tx_nsp > 0); /* work items have been set up */
1011 peer = koibnal_find_peer_locked (nid);
1013 read_unlock (g_lock);
1014 tx->tx_status = -EHOSTUNREACH;
1015 koibnal_tx_done (tx);
1019 conn = koibnal_find_conn_locked (peer);
1021 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1022 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1023 atomic_read (&conn->ibc_refcount));
1024 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1025 read_unlock (g_lock);
1027 koibnal_queue_tx (tx, conn);
1031 /* Making one or more connections; I'll need a write lock... */
1032 read_unlock (g_lock);
1033 write_lock_irqsave (g_lock, flags);
1035 peer = koibnal_find_peer_locked (nid);
1037 write_unlock_irqrestore (g_lock, flags);
1038 tx->tx_status = -EHOSTUNREACH;
1039 koibnal_tx_done (tx);
1043 conn = koibnal_find_conn_locked (peer);
1045 /* Connection exists; queue message on it */
1046 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1047 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1048 atomic_read (&conn->ibc_refcount));
1049 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1050 write_unlock_irqrestore (g_lock, flags);
1052 koibnal_queue_tx (tx, conn);
1056 if (peer->ibp_connecting == 0) {
1057 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1058 write_unlock_irqrestore (g_lock, flags);
1059 tx->tx_status = -EHOSTUNREACH;
1060 koibnal_tx_done (tx);
1064 peer->ibp_connecting = 1;
1065 atomic_inc (&peer->ibp_refcount); /* extra ref for connd */
1067 spin_lock (&koibnal_data.koib_connd_lock);
1069 list_add_tail (&peer->ibp_connd_list,
1070 &koibnal_data.koib_connd_peers);
1071 wake_up (&koibnal_data.koib_connd_waitq);
1073 spin_unlock (&koibnal_data.koib_connd_lock);
1076 /* A connection is being established; queue the message... */
1077 list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1079 write_unlock_irqrestore (g_lock, flags);
1083 koibnal_start_passive_rdma (int type, ptl_nid_t nid,
1084 lib_msg_t *libmsg, ptl_hdr_t *hdr)
1086 int nob = libmsg->md->length;
1092 LASSERT (type == OPENIBNAL_MSG_PUT_RDMA ||
1093 type == OPENIBNAL_MSG_GET_RDMA);
1095 LASSERT (!in_interrupt()); /* Mapping could block */
1097 if (type == OPENIBNAL_MSG_PUT_RDMA) {
1098 access = IB_ACCESS_REMOTE_READ;
1100 access = IB_ACCESS_REMOTE_WRITE |
1101 IB_ACCESS_LOCAL_WRITE;
1104 tx = koibnal_get_idle_tx (1); /* May block; caller is an app thread */
1105 LASSERT (tx != NULL);
1107 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
1108 rc = koibnal_map_iov (tx, access,
1109 libmsg->md->md_niov,
1110 libmsg->md->md_iov.iov,
1113 rc = koibnal_map_kiov (tx, access,
1114 libmsg->md->md_niov,
1115 libmsg->md->md_iov.kiov,
1119 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1123 if (type == OPENIBNAL_MSG_GET_RDMA) {
1124 /* reply gets finalized when tx completes */
1125 tx->tx_libmsg[1] = lib_create_reply_msg(&koibnal_lib,
1127 if (tx->tx_libmsg[1] == NULL) {
1128 CERROR ("Can't create reply for GET -> "LPX64"\n",
1135 tx->tx_passive_rdma = 1;
1137 oibmsg = tx->tx_msg;
1139 oibmsg->oibm_u.rdma.oibrm_hdr = *hdr;
1140 oibmsg->oibm_u.rdma.oibrm_cookie = tx->tx_passive_rdma_cookie;
1141 oibmsg->oibm_u.rdma.oibrm_desc.rd_key = tx->tx_md.md_rkey;
1142 oibmsg->oibm_u.rdma.oibrm_desc.rd_addr = tx->tx_md.md_addr;
1143 oibmsg->oibm_u.rdma.oibrm_desc.rd_nob = nob;
1145 koibnal_init_tx_msg (tx, type, sizeof (koib_rdma_msg_t));
1147 CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1149 tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1150 tx->tx_md.md_addr, nob);
1152 /* libmsg gets finalized when tx completes. */
1153 tx->tx_libmsg[0] = libmsg;
1155 koibnal_launch_tx(tx, nid);
1160 koibnal_tx_done (tx);
1165 koibnal_start_active_rdma (int type, int status,
1166 koib_rx_t *rx, lib_msg_t *libmsg,
1168 struct iovec *iov, ptl_kiov_t *kiov,
1169 size_t offset, size_t nob)
1171 koib_msg_t *rxmsg = rx->rx_msg;
1178 CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1179 type, status, niov, offset, nob);
1181 /* Called by scheduler */
1182 LASSERT (!in_interrupt ());
1184 /* Either all pages or all vaddrs */
1185 LASSERT (!(kiov != NULL && iov != NULL));
1187 /* No data if we're completing with failure */
1188 LASSERT (status == 0 || nob == 0);
1190 LASSERT (type == OPENIBNAL_MSG_GET_DONE ||
1191 type == OPENIBNAL_MSG_PUT_DONE);
1193 /* Flag I'm completing the RDMA. Even if I fail to send the
1194 * completion message, I will have tried my best so further
1195 * attempts shouldn't be tried. */
1196 LASSERT (!rx->rx_rdma);
1199 if (type == OPENIBNAL_MSG_GET_DONE) {
1201 rdma_op = IB_OP_RDMA_WRITE;
1202 LASSERT (rxmsg->oibm_type == OPENIBNAL_MSG_GET_RDMA);
1204 access = IB_ACCESS_LOCAL_WRITE;
1205 rdma_op = IB_OP_RDMA_READ;
1206 LASSERT (rxmsg->oibm_type == OPENIBNAL_MSG_PUT_RDMA);
1209 tx = koibnal_get_idle_tx (0); /* Mustn't block */
1211 CERROR ("tx descs exhausted on RDMA from "LPX64
1212 " completing locally with failure\n",
1213 rx->rx_conn->ibc_peer->ibp_nid);
1214 lib_finalize (&koibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1217 LASSERT (tx->tx_nsp == 0);
1220 /* We actually need to transfer some data (the transfer
1221 * size could get truncated to zero when the incoming
1222 * message is matched) */
1225 rc = koibnal_map_kiov (tx, access,
1226 niov, kiov, offset, nob);
1228 rc = koibnal_map_iov (tx, access,
1229 niov, iov, offset, nob);
1232 CERROR ("Can't map RDMA -> "LPX64": %d\n",
1233 rx->rx_conn->ibc_peer->ibp_nid, rc);
1234 /* We'll skip the RDMA and complete with failure. */
1238 tx->tx_gl[0] = (struct ib_gather_scatter) {
1239 .address = tx->tx_md.md_addr,
1241 .key = tx->tx_md.md_lkey,
1244 tx->tx_sp[0] = (struct ib_send_param) {
1245 .work_request_id = (__u64)((unsigned long)tx),
1247 .gather_list = &tx->tx_gl[0],
1248 .num_gather_entries = 1,
1249 .remote_address = rxmsg->oibm_u.rdma.oibrm_desc.rd_addr,
1250 .rkey = rxmsg->oibm_u.rdma.oibrm_desc.rd_key,
1251 .device_specific = NULL,
1252 .solicited_event = 0,
1254 .immediate_data_valid = 0,
1265 txmsg->oibm_u.completion.oibcm_cookie = rxmsg->oibm_u.rdma.oibrm_cookie;
1266 txmsg->oibm_u.completion.oibcm_status = status;
1268 koibnal_init_tx_msg(tx, type, sizeof (koib_completion_msg_t));
1270 if (status == 0 && nob != 0) {
1271 LASSERT (tx->tx_nsp > 1);
1272 /* RDMA: libmsg gets finalized when the tx completes. This
1273 * is after the completion message has been sent, which in
1274 * turn is after the RDMA has finished. */
1275 tx->tx_libmsg[0] = libmsg;
1277 LASSERT (tx->tx_nsp == 1);
1278 /* No RDMA: local completion happens now! */
1279 CDEBUG(D_WARNING,"No data: immediate completion\n");
1280 lib_finalize (&koibnal_lib, NULL, libmsg,
1281 status == 0 ? PTL_OK : PTL_FAIL);
1284 /* +1 ref for this tx... */
1285 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1286 rx->rx_conn, rx->rx_conn->ibc_state,
1287 rx->rx_conn->ibc_peer->ibp_nid,
1288 atomic_read (&rx->rx_conn->ibc_refcount));
1289 atomic_inc (&rx->rx_conn->ibc_refcount);
1290 /* ...and queue it up */
1291 koibnal_queue_tx(tx, rx->rx_conn);
1295 koibnal_sendmsg(lib_nal_t *nal,
1302 unsigned int payload_niov,
1303 struct iovec *payload_iov,
1304 ptl_kiov_t *payload_kiov,
1305 size_t payload_offset,
1312 /* NB 'private' is different depending on what we're sending.... */
1314 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1315 " pid %d\n", payload_nob, payload_niov, nid , pid);
1317 LASSERT (payload_nob == 0 || payload_niov > 0);
1318 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1320 /* Thread context if we're sending payload */
1321 LASSERT (!in_interrupt() || payload_niov == 0);
1322 /* payload is either all vaddrs or all pages */
1323 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1330 case PTL_MSG_REPLY: {
1331 /* reply's 'private' is the incoming receive */
1332 koib_rx_t *rx = private;
1334 /* RDMA reply expected? */
1335 if (rx->rx_msg->oibm_type == OPENIBNAL_MSG_GET_RDMA) {
1336 koibnal_start_active_rdma(OPENIBNAL_MSG_GET_DONE, 0,
1337 rx, libmsg, payload_niov,
1338 payload_iov, payload_kiov,
1339 payload_offset, payload_nob);
1343 /* Incoming message consistent with immediate reply? */
1344 if (rx->rx_msg->oibm_type != OPENIBNAL_MSG_IMMEDIATE) {
1345 CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1346 nid, rx->rx_msg->oibm_type);
1350 /* Will it fit in a message? */
1351 nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[payload_nob]);
1352 if (nob >= OPENIBNAL_MSG_SIZE) {
1353 CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n",
1361 /* might the REPLY message be big enough to need RDMA? */
1362 nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[libmsg->md->length]);
1363 if (nob > OPENIBNAL_MSG_SIZE)
1364 return (koibnal_start_passive_rdma(OPENIBNAL_MSG_GET_RDMA,
1369 LASSERT (payload_nob == 0);
1373 /* Is the payload big enough to need RDMA? */
1374 nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[payload_nob]);
1375 if (nob > OPENIBNAL_MSG_SIZE)
1376 return (koibnal_start_passive_rdma(OPENIBNAL_MSG_PUT_RDMA,
1382 tx = koibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1383 type == PTL_MSG_REPLY ||
1386 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n",
1387 type, nid, in_interrupt() ? " (intr)" : "");
1388 return (PTL_NO_SPACE);
1391 oibmsg = tx->tx_msg;
1392 oibmsg->oibm_u.immediate.oibim_hdr = *hdr;
1394 if (payload_nob > 0) {
1395 if (payload_kiov != NULL)
1396 lib_copy_kiov2buf(oibmsg->oibm_u.immediate.oibim_payload,
1397 payload_niov, payload_kiov,
1398 payload_offset, payload_nob);
1400 lib_copy_iov2buf(oibmsg->oibm_u.immediate.oibim_payload,
1401 payload_niov, payload_iov,
1402 payload_offset, payload_nob);
1405 koibnal_init_tx_msg (tx, OPENIBNAL_MSG_IMMEDIATE,
1406 offsetof(koib_immediate_msg_t,
1407 oibim_payload[payload_nob]));
1409 /* libmsg gets finalized when tx completes */
1410 tx->tx_libmsg[0] = libmsg;
1412 koibnal_launch_tx(tx, nid);
1417 koibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1418 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1419 unsigned int payload_niov, struct iovec *payload_iov,
1420 size_t payload_offset, size_t payload_len)
1422 return (koibnal_sendmsg(nal, private, cookie,
1423 hdr, type, nid, pid,
1424 payload_niov, payload_iov, NULL,
1425 payload_offset, payload_len));
1429 koibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1430 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1431 unsigned int payload_niov, ptl_kiov_t *payload_kiov,
1432 size_t payload_offset, size_t payload_len)
1434 return (koibnal_sendmsg(nal, private, cookie,
1435 hdr, type, nid, pid,
1436 payload_niov, NULL, payload_kiov,
1437 payload_offset, payload_len));
1441 koibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1442 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1443 size_t offset, size_t mlen, size_t rlen)
1445 koib_rx_t *rx = private;
1446 koib_msg_t *rxmsg = rx->rx_msg;
1449 LASSERT (mlen <= rlen);
1450 LASSERT (!in_interrupt ());
1451 /* Either all pages or all vaddrs */
1452 LASSERT (!(kiov != NULL && iov != NULL));
1454 switch (rxmsg->oibm_type) {
1459 case OPENIBNAL_MSG_IMMEDIATE:
1460 msg_nob = offsetof(koib_msg_t, oibm_u.immediate.oibim_payload[rlen]);
1461 if (msg_nob > OPENIBNAL_MSG_SIZE) {
1462 CERROR ("Immediate message from "LPX64" too big: %d\n",
1463 rxmsg->oibm_u.immediate.oibim_hdr.src_nid, rlen);
1468 lib_copy_buf2kiov(niov, kiov, offset,
1469 rxmsg->oibm_u.immediate.oibim_payload,
1472 lib_copy_buf2iov(niov, iov, offset,
1473 rxmsg->oibm_u.immediate.oibim_payload,
1476 lib_finalize (nal, NULL, libmsg, PTL_OK);
1479 case OPENIBNAL_MSG_GET_RDMA:
1480 /* We get called here just to discard any junk after the
1482 LASSERT (libmsg == NULL);
1483 lib_finalize (nal, NULL, libmsg, PTL_OK);
1486 case OPENIBNAL_MSG_PUT_RDMA:
1487 koibnal_start_active_rdma (OPENIBNAL_MSG_PUT_DONE, 0,
1489 niov, iov, kiov, offset, mlen);
1495 koibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1496 unsigned int niov, struct iovec *iov,
1497 size_t offset, size_t mlen, size_t rlen)
1499 return (koibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1500 offset, mlen, rlen));
1504 koibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1505 unsigned int niov, ptl_kiov_t *kiov,
1506 size_t offset, size_t mlen, size_t rlen)
1508 return (koibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1509 offset, mlen, rlen));
1513 koibnal_thread_start (int (*fn)(void *arg), void *arg)
1515 long pid = kernel_thread (fn, arg, 0);
1520 atomic_inc (&koibnal_data.koib_nthreads);
1525 koibnal_thread_fini (void)
1527 atomic_dec (&koibnal_data.koib_nthreads);
1531 koibnal_close_conn_locked (koib_conn_t *conn, int error)
1533 /* This just does the immmediate housekeeping, and schedules the
1534 * connection for the connd to finish off.
1535 * Caller holds koib_global_lock exclusively in irq context */
1536 koib_peer_t *peer = conn->ibc_peer;
1538 CDEBUG (error == 0 ? D_NET : D_ERROR,
1539 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1541 LASSERT (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED ||
1542 conn->ibc_state == OPENIBNAL_CONN_CONNECTING);
1544 if (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED) {
1545 /* koib_connd_conns takes ibc_list's ref */
1546 list_del (&conn->ibc_list);
1548 /* new ref for koib_connd_conns */
1549 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1550 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1551 atomic_read (&conn->ibc_refcount));
1552 atomic_inc (&conn->ibc_refcount);
1555 if (list_empty (&peer->ibp_conns) &&
1556 peer->ibp_persistence == 0) {
1557 /* Non-persistent peer with no more conns... */
1558 koibnal_unlink_peer_locked (peer);
1561 conn->ibc_state = OPENIBNAL_CONN_DEATHROW;
1563 /* Schedule conn for closing/destruction */
1564 spin_lock (&koibnal_data.koib_connd_lock);
1566 list_add_tail (&conn->ibc_list, &koibnal_data.koib_connd_conns);
1567 wake_up (&koibnal_data.koib_connd_waitq);
1569 spin_unlock (&koibnal_data.koib_connd_lock);
1573 koibnal_close_conn (koib_conn_t *conn, int why)
1575 unsigned long flags;
1578 write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1580 LASSERT (conn->ibc_state >= OPENIBNAL_CONN_CONNECTING);
1582 if (conn->ibc_state <= OPENIBNAL_CONN_ESTABLISHED) {
1584 koibnal_close_conn_locked (conn, why);
1587 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1592 koibnal_peer_connect_failed (koib_peer_t *peer, int active, int rc)
1594 LIST_HEAD (zombies);
1596 unsigned long flags;
1599 LASSERT (peer->ibp_reconnect_interval >= OPENIBNAL_MIN_RECONNECT_INTERVAL);
1601 write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1603 LASSERT (peer->ibp_connecting != 0);
1604 peer->ibp_connecting--;
1606 if (peer->ibp_connecting != 0) {
1607 /* another connection attempt under way (loopback?)... */
1608 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1612 if (list_empty(&peer->ibp_conns)) {
1613 /* Say when active connection can be re-attempted */
1614 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1615 /* Increase reconnection interval */
1616 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1617 OPENIBNAL_MAX_RECONNECT_INTERVAL);
1619 /* Take peer's blocked blocked transmits; I'll complete
1620 * them with error */
1621 while (!list_empty (&peer->ibp_tx_queue)) {
1622 tx = list_entry (peer->ibp_tx_queue.next,
1623 koib_tx_t, tx_list);
1625 list_del (&tx->tx_list);
1626 list_add_tail (&tx->tx_list, &zombies);
1629 if (koibnal_peer_active(peer) &&
1630 (peer->ibp_persistence == 0)) {
1631 /* failed connection attempt on non-persistent peer */
1632 koibnal_unlink_peer_locked (peer);
1635 /* Can't have blocked transmits if there are connections */
1636 LASSERT (list_empty(&peer->ibp_tx_queue));
1639 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1641 if (!list_empty (&zombies))
1642 CERROR ("Deleting messages for "LPX64": connection failed\n",
1645 while (!list_empty (&zombies)) {
1646 tx = list_entry (zombies.next, koib_tx_t, tx_list);
1648 list_del (&tx->tx_list);
1650 tx->tx_status = -EHOSTUNREACH;
1651 koibnal_tx_done (tx);
1656 koibnal_connreq_done (koib_conn_t *conn, int active, int status)
1658 int state = conn->ibc_state;
1659 koib_peer_t *peer = conn->ibc_peer;
1661 unsigned long flags;
1665 /* passive connection has no connreq & vice versa */
1666 LASSERT (!active == !(conn->ibc_connreq != NULL));
1668 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1669 conn->ibc_connreq = NULL;
1672 if (state == OPENIBNAL_CONN_CONNECTING) {
1673 /* Install common (active/passive) callback for
1674 * disconnect/idle notification if I got as far as getting
1676 rc = tsIbCmCallbackModify(conn->ibc_comm_id,
1677 koibnal_conn_callback, conn);
1681 write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1683 LASSERT (peer->ibp_connecting != 0);
1686 /* connection established... */
1687 LASSERT (state == OPENIBNAL_CONN_CONNECTING);
1688 conn->ibc_state = OPENIBNAL_CONN_ESTABLISHED;
1690 if (!koibnal_peer_active(peer)) {
1691 /* ...but peer deleted meantime */
1692 status = -ECONNABORTED;
1695 LASSERT (state == OPENIBNAL_CONN_INIT_QP ||
1696 state == OPENIBNAL_CONN_CONNECTING);
1700 /* Everything worked! */
1702 peer->ibp_connecting--;
1704 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1705 * the IB_CM_IDLE callback */
1706 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1707 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1708 atomic_read (&conn->ibc_refcount));
1709 atomic_inc (&conn->ibc_refcount);
1710 list_add (&conn->ibc_list, &peer->ibp_conns);
1712 /* reset reconnect interval for next attempt */
1713 peer->ibp_reconnect_interval = OPENIBNAL_MIN_RECONNECT_INTERVAL;
1715 /* post blocked sends to the new connection */
1716 spin_lock (&conn->ibc_lock);
1718 while (!list_empty (&peer->ibp_tx_queue)) {
1719 tx = list_entry (peer->ibp_tx_queue.next,
1720 koib_tx_t, tx_list);
1722 list_del (&tx->tx_list);
1724 /* +1 ref for each tx */
1725 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1726 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1727 atomic_read (&conn->ibc_refcount));
1728 atomic_inc (&conn->ibc_refcount);
1729 koibnal_queue_tx_locked (tx, conn);
1732 spin_unlock (&conn->ibc_lock);
1734 /* Nuke any dangling conns from a different peer instance... */
1735 koibnal_close_stale_conns_locked (conn->ibc_peer,
1736 conn->ibc_incarnation);
1738 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1740 /* queue up all the receives */
1741 for (i = 0; i < OPENIBNAL_RX_MSGS; i++) {
1742 /* +1 ref for rx desc */
1743 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1744 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1745 atomic_read (&conn->ibc_refcount));
1746 atomic_inc (&conn->ibc_refcount);
1748 CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1749 i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1750 conn->ibc_rxs[i].rx_vaddr);
1752 koibnal_post_rx (&conn->ibc_rxs[i], 0);
1755 koibnal_check_sends (conn);
1759 /* connection failed */
1760 if (state == OPENIBNAL_CONN_CONNECTING) {
1761 /* schedule for connd to close */
1762 koibnal_close_conn_locked (conn, status);
1764 /* Don't have a CM comm_id; just wait for refs to drain */
1765 conn->ibc_state = OPENIBNAL_CONN_ZOMBIE;
1768 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1770 koibnal_peer_connect_failed (conn->ibc_peer, active, status);
1772 if (state != OPENIBNAL_CONN_CONNECTING) {
1773 /* drop caller's ref if we're not waiting for the
1774 * IB_CM_IDLE callback */
1775 koibnal_put_conn (conn);
1780 koibnal_accept (koib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
1781 ptl_nid_t nid, __u64 incarnation, int queue_depth)
1783 koib_conn_t *conn = koibnal_create_conn();
1786 unsigned long flags;
1791 if (queue_depth != OPENIBNAL_MSG_QUEUE_SIZE) {
1792 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
1793 nid, queue_depth, OPENIBNAL_MSG_QUEUE_SIZE);
1797 /* assume 'nid' is a new peer */
1798 peer = koibnal_create_peer (nid);
1800 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
1801 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1802 atomic_read (&conn->ibc_refcount));
1803 atomic_dec (&conn->ibc_refcount);
1804 koibnal_destroy_conn(conn);
1808 write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
1810 peer2 = koibnal_find_peer_locked(nid);
1811 if (peer2 == NULL) {
1812 /* peer table takes my ref on peer */
1813 list_add_tail (&peer->ibp_list,
1814 koibnal_nid2peerlist(nid));
1816 koibnal_put_peer (peer);
1820 /* +1 ref for conn */
1821 atomic_inc (&peer->ibp_refcount);
1822 peer->ibp_connecting++;
1824 write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
1826 conn->ibc_peer = peer;
1827 conn->ibc_state = OPENIBNAL_CONN_CONNECTING;
1828 conn->ibc_comm_id = cid;
1829 conn->ibc_incarnation = incarnation;
1830 conn->ibc_credits = OPENIBNAL_MSG_QUEUE_SIZE;
1836 tTS_IB_CM_CALLBACK_RETURN
1837 koibnal_idle_conn_callback (tTS_IB_CM_EVENT event,
1838 tTS_IB_CM_COMM_ID cid,
1842 /* Shouldn't ever get a callback after TS_IB_CM_IDLE */
1843 CERROR ("Unexpected event %d: conn %p\n", event, arg);
1845 return TS_IB_CM_CALLBACK_PROCEED;
1848 tTS_IB_CM_CALLBACK_RETURN
1849 koibnal_conn_callback (tTS_IB_CM_EVENT event,
1850 tTS_IB_CM_COMM_ID cid,
1854 koib_conn_t *conn = arg;
1857 /* Established Connection Notifier */
1861 CERROR("Connection %p -> "LPX64" ERROR %d\n",
1862 conn, conn->ibc_peer->ibp_nid, event);
1863 koibnal_close_conn (conn, -ECONNABORTED);
1866 case TS_IB_CM_DISCONNECTED:
1867 CDEBUG(D_WARNING, "Connection %p -> "LPX64" DISCONNECTED.\n",
1868 conn, conn->ibc_peer->ibp_nid);
1869 koibnal_close_conn (conn, 0);
1873 CDEBUG(D_NET, "Connection %p -> "LPX64" IDLE.\n",
1874 conn, conn->ibc_peer->ibp_nid);
1875 koibnal_put_conn (conn); /* Lose CM's ref */
1877 /* LASSERT (no further callbacks) */
1878 rc = tsIbCmCallbackModify(cid,
1879 koibnal_idle_conn_callback, conn);
1884 return TS_IB_CM_CALLBACK_PROCEED;
1887 tTS_IB_CM_CALLBACK_RETURN
1888 koibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
1889 tTS_IB_CM_COMM_ID cid,
1893 koib_conn_t *conn = arg;
1899 /* no connection yet */
1900 CERROR ("Unexpected event: %d\n", event);
1901 return TS_IB_CM_CALLBACK_ABORT;
1904 CERROR ("Unexpected event %p -> "LPX64": %d\n",
1905 conn, conn->ibc_peer->ibp_nid, event);
1906 koibnal_connreq_done (conn, 0, -ECONNABORTED);
1909 case TS_IB_CM_REQ_RECEIVED: {
1910 struct ib_cm_req_received_param *req = param;
1911 koib_wire_connreq_t *wcr = req->remote_private_data;
1913 LASSERT (conn == NULL);
1915 CDEBUG(D_NET, "REQ from "LPX64"\n", le64_to_cpu(wcr->wcr_nid));
1917 if (req->remote_private_data_len < sizeof (*wcr)) {
1918 CERROR("Connect from remote LID %04x: too short %d\n",
1919 req->dlid, req->remote_private_data_len);
1920 return TS_IB_CM_CALLBACK_ABORT;
1923 if (wcr->wcr_magic != cpu_to_le32(OPENIBNAL_MSG_MAGIC)) {
1924 CERROR ("Can't accept LID %04x: bad magic %08x\n",
1925 req->dlid, le32_to_cpu(wcr->wcr_magic));
1926 return TS_IB_CM_CALLBACK_ABORT;
1929 if (wcr->wcr_version != cpu_to_le16(OPENIBNAL_MSG_VERSION)) {
1930 CERROR ("Can't accept LID %04x: bad version %d\n",
1931 req->dlid, le16_to_cpu(wcr->wcr_magic));
1932 return TS_IB_CM_CALLBACK_ABORT;
1935 rc = koibnal_accept(&conn,
1937 le64_to_cpu(wcr->wcr_nid),
1938 le64_to_cpu(wcr->wcr_incarnation),
1939 le16_to_cpu(wcr->wcr_queue_depth));
1941 CERROR ("Can't accept "LPX64": %d\n",
1942 le64_to_cpu(wcr->wcr_nid), rc);
1943 return TS_IB_CM_CALLBACK_ABORT;
1946 /* update 'arg' for next callback */
1947 rc = tsIbCmCallbackModify(cid,
1948 koibnal_passive_conn_callback, conn);
1951 req->accept_param.qp = conn->ibc_qp;
1952 *((koib_wire_connreq_t *)req->accept_param.reply_private_data)
1953 = (koib_wire_connreq_t) {
1954 .wcr_magic = cpu_to_le32(OPENIBNAL_MSG_MAGIC),
1955 .wcr_version = cpu_to_le16(OPENIBNAL_MSG_VERSION),
1956 .wcr_queue_depth = cpu_to_le32(OPENIBNAL_MSG_QUEUE_SIZE),
1957 .wcr_nid = cpu_to_le64(koibnal_data.koib_nid),
1958 .wcr_incarnation = cpu_to_le64(koibnal_data.koib_incarnation),
1960 req->accept_param.reply_private_data_len = sizeof(koib_wire_connreq_t);
1961 req->accept_param.responder_resources = OPENIBNAL_RESPONDER_RESOURCES;
1962 req->accept_param.initiator_depth = OPENIBNAL_RESPONDER_RESOURCES;
1963 req->accept_param.rnr_retry_count = OPENIBNAL_RNR_RETRY;
1964 req->accept_param.flow_control = OPENIBNAL_FLOW_CONTROL;
1966 CDEBUG(D_NET, "Proceeding\n");
1970 case TS_IB_CM_ESTABLISHED:
1971 LASSERT (conn != NULL);
1972 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
1973 conn, conn->ibc_peer->ibp_nid);
1975 koibnal_connreq_done (conn, 0, 0);
1979 /* NB if the connreq is done, we switch to koibnal_conn_callback */
1980 return TS_IB_CM_CALLBACK_PROCEED;
1983 tTS_IB_CM_CALLBACK_RETURN
1984 koibnal_active_conn_callback (tTS_IB_CM_EVENT event,
1985 tTS_IB_CM_COMM_ID cid,
1989 koib_conn_t *conn = arg;
1992 case TS_IB_CM_REP_RECEIVED: {
1993 struct ib_cm_rep_received_param *rep = param;
1994 koib_wire_connreq_t *wcr = rep->remote_private_data;
1996 if (rep->remote_private_data_len < sizeof (*wcr)) {
1997 CERROR ("Short reply from "LPX64": %d\n",
1998 conn->ibc_peer->ibp_nid,
1999 rep->remote_private_data_len);
2000 koibnal_connreq_done (conn, 1, -EPROTO);
2004 if (wcr->wcr_magic != cpu_to_le32(OPENIBNAL_MSG_MAGIC)) {
2005 CERROR ("Can't connect "LPX64": bad magic %08x\n",
2006 conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic));
2007 koibnal_connreq_done (conn, 1, -EPROTO);
2011 if (wcr->wcr_version != cpu_to_le16(OPENIBNAL_MSG_VERSION)) {
2012 CERROR ("Can't connect "LPX64": bad version %d\n",
2013 conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic));
2014 koibnal_connreq_done (conn, 1, -EPROTO);
2018 if (wcr->wcr_queue_depth != cpu_to_le16(OPENIBNAL_MSG_QUEUE_SIZE)) {
2019 CERROR ("Can't connect "LPX64": bad queue depth %d\n",
2020 conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_queue_depth));
2021 koibnal_connreq_done (conn, 1, -EPROTO);
2025 if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) {
2026 CERROR ("Unexpected NID "LPX64" from "LPX64"\n",
2027 le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid);
2028 koibnal_connreq_done (conn, 1, -EPROTO);
2032 CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2033 conn, conn->ibc_peer->ibp_nid);
2035 conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation);
2036 conn->ibc_credits = OPENIBNAL_MSG_QUEUE_SIZE;
2040 case TS_IB_CM_ESTABLISHED:
2041 CDEBUG(D_WARNING, "Connection %p -> "LPX64" Established\n",
2042 conn, conn->ibc_peer->ibp_nid);
2044 koibnal_connreq_done (conn, 1, 0);
2048 CERROR("Connection %p -> "LPX64" IDLE\n",
2049 conn, conn->ibc_peer->ibp_nid);
2050 /* Back out state change: I'm disengaged from CM */
2051 conn->ibc_state = OPENIBNAL_CONN_INIT_QP;
2053 koibnal_connreq_done (conn, 1, -ECONNABORTED);
2057 CERROR("Connection %p -> "LPX64" ERROR %d\n",
2058 conn, conn->ibc_peer->ibp_nid, event);
2059 koibnal_connreq_done (conn, 1, -ECONNABORTED);
2063 /* NB if the connreq is done, we switch to koibnal_conn_callback */
2064 return TS_IB_CM_CALLBACK_PROCEED;
2068 koibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2069 struct ib_path_record *resp, int remaining,
2072 koib_conn_t *conn = arg;
2075 CERROR ("status %d\n", status);
2076 koibnal_connreq_done (conn, 1, status);
2080 conn->ibc_connreq->cr_path = *resp;
2082 conn->ibc_connreq->cr_wcr = (koib_wire_connreq_t) {
2083 .wcr_magic = cpu_to_le32(OPENIBNAL_MSG_MAGIC),
2084 .wcr_version = cpu_to_le16(OPENIBNAL_MSG_VERSION),
2085 .wcr_queue_depth = cpu_to_le16(OPENIBNAL_MSG_QUEUE_SIZE),
2086 .wcr_nid = cpu_to_le64(koibnal_data.koib_nid),
2087 .wcr_incarnation = cpu_to_le64(koibnal_data.koib_incarnation),
2090 conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
2092 .req_private_data = &conn->ibc_connreq->cr_wcr,
2093 .req_private_data_len = sizeof(conn->ibc_connreq->cr_wcr),
2094 .responder_resources = OPENIBNAL_RESPONDER_RESOURCES,
2095 .initiator_depth = OPENIBNAL_RESPONDER_RESOURCES,
2096 .retry_count = OPENIBNAL_RETRY,
2097 .rnr_retry_count = OPENIBNAL_RNR_RETRY,
2098 .cm_response_timeout = koibnal_tunables.koib_io_timeout,
2099 .max_cm_retries = OPENIBNAL_CM_RETRY,
2100 .flow_control = OPENIBNAL_FLOW_CONTROL,
2103 /* XXX set timeout just like SDP!!!*/
2104 conn->ibc_connreq->cr_path.packet_life = 13;
2106 /* Flag I'm getting involved with the CM... */
2107 conn->ibc_state = OPENIBNAL_CONN_CONNECTING;
2109 CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2110 conn->ibc_connreq->cr_service.service_id,
2111 *koibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2113 /* koibnal_connect_callback gets my conn ref */
2114 status = ib_cm_connect (&conn->ibc_connreq->cr_connparam,
2115 &conn->ibc_connreq->cr_path, NULL,
2116 conn->ibc_connreq->cr_service.service_id, 0,
2117 koibnal_active_conn_callback, conn,
2118 &conn->ibc_comm_id);
2120 CERROR ("Connect: %d\n", status);
2121 /* Back out state change: I've not got a CM comm_id yet... */
2122 conn->ibc_state = OPENIBNAL_CONN_INIT_QP;
2123 koibnal_connreq_done (conn, 1, status);
2127 /* return non-zero to prevent further callbacks */
2132 koibnal_service_get_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
2133 struct ib_common_attrib_service *resp, void *arg)
2135 koib_conn_t *conn = arg;
2138 CERROR ("status %d\n", status);
2139 koibnal_connreq_done (conn, 1, status);
2143 CDEBUG(D_NET, "Got status %d, service id "LPX64", on "LPX64"\n",
2144 status, resp->service_id,
2145 *koibnal_service_nid_field(resp));
2147 conn->ibc_connreq->cr_service = *resp;
2149 status = ib_cached_gid_get(koibnal_data.koib_device,
2150 koibnal_data.koib_port, 0,
2151 conn->ibc_connreq->cr_gid);
2152 LASSERT (status == 0);
2154 /* koibnal_pathreq_callback gets my conn ref */
2155 status = tsIbPathRecordRequest (koibnal_data.koib_device,
2156 koibnal_data.koib_port,
2157 conn->ibc_connreq->cr_gid,
2158 conn->ibc_connreq->cr_service.service_gid,
2159 conn->ibc_connreq->cr_service.service_pkey,
2161 koibnal_tunables.koib_io_timeout * HZ,
2163 koibnal_pathreq_callback, conn,
2164 &conn->ibc_connreq->cr_tid);
2169 CERROR ("Path record request: %d\n", status);
2170 koibnal_connreq_done (conn, 1, status);
2174 koibnal_connect_peer (koib_peer_t *peer)
2176 koib_conn_t *conn = koibnal_create_conn();
2179 LASSERT (peer->ibp_connecting != 0);
2182 CERROR ("Can't allocate conn\n");
2183 koibnal_peer_connect_failed (peer, 1, -ENOMEM);
2187 conn->ibc_peer = peer;
2188 atomic_inc (&peer->ibp_refcount);
2190 PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2191 if (conn->ibc_connreq == NULL) {
2192 CERROR ("Can't allocate connreq\n");
2193 koibnal_connreq_done (conn, 1, -ENOMEM);
2197 memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2199 koibnal_set_service_keys(&conn->ibc_connreq->cr_service, peer->ibp_nid);
2201 /* koibnal_service_get_callback gets my conn ref */
2202 rc = ib_service_get (koibnal_data.koib_device,
2203 koibnal_data.koib_port,
2204 &conn->ibc_connreq->cr_service,
2205 KOIBNAL_SERVICE_KEY_MASK,
2206 koibnal_tunables.koib_io_timeout * HZ,
2207 koibnal_service_get_callback, conn,
2208 &conn->ibc_connreq->cr_tid);
2213 CERROR ("ib_service_get: %d\n", rc);
2214 koibnal_connreq_done (conn, 1, rc);
2218 koibnal_conn_timed_out (koib_conn_t *conn)
2221 struct list_head *ttmp;
2222 unsigned long flags;
2225 spin_lock_irqsave (&conn->ibc_lock, flags);
2227 list_for_each (ttmp, &conn->ibc_rdma_queue) {
2228 tx = list_entry (ttmp, koib_tx_t, tx_list);
2230 LASSERT (tx->tx_passive_rdma);
2231 LASSERT (tx->tx_passive_rdma_wait);
2233 if (time_after_eq (jiffies, tx->tx_passive_rdma_deadline)) {
2238 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2244 koibnal_check_conns (int idx)
2246 struct list_head *peers = &koibnal_data.koib_peers[idx];
2247 struct list_head *ptmp;
2250 struct list_head *ctmp;
2253 /* NB. We expect to have a look at all the peers and not find any
2254 * rdmas to time out, so we just use a shared lock while we
2256 read_lock (&koibnal_data.koib_global_lock);
2258 list_for_each (ptmp, peers) {
2259 peer = list_entry (ptmp, koib_peer_t, ibp_list);
2261 list_for_each (ctmp, &peer->ibp_conns) {
2262 conn = list_entry (ctmp, koib_conn_t, ibc_list);
2264 LASSERT (conn->ibc_state == OPENIBNAL_CONN_ESTABLISHED);
2266 /* In case we have enough credits to return via a
2267 * NOOP, but there were no non-blocking tx descs
2268 * free to do it last time... */
2269 koibnal_check_sends(conn);
2271 if (!koibnal_conn_timed_out(conn))
2274 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2275 conn, conn->ibc_state, peer->ibp_nid,
2276 atomic_read (&conn->ibc_refcount));
2278 atomic_inc (&conn->ibc_refcount);
2279 read_unlock (&koibnal_data.koib_global_lock);
2281 CERROR("Timed out RDMA with "LPX64"\n",
2284 koibnal_close_conn (conn, -ETIMEDOUT);
2285 koibnal_put_conn (conn);
2287 /* start again now I've dropped the lock */
2292 read_unlock (&koibnal_data.koib_global_lock);
2296 koibnal_terminate_conn (koib_conn_t *conn)
2298 unsigned long flags;
2302 CDEBUG(D_NET, "conn %p\n", conn);
2303 LASSERT (conn->ibc_state == OPENIBNAL_CONN_DEATHROW);
2304 conn->ibc_state = OPENIBNAL_CONN_ZOMBIE;
2306 rc = ib_cm_disconnect (conn->ibc_comm_id);
2308 CERROR ("Error %d disconnecting conn %p -> "LPX64"\n",
2309 rc, conn, conn->ibc_peer->ibp_nid);
2311 /* complete blocked passive RDMAs */
2312 spin_lock_irqsave (&conn->ibc_lock, flags);
2314 while (!list_empty (&conn->ibc_rdma_queue)) {
2315 koib_tx_t *tx = list_entry (conn->ibc_rdma_queue.next,
2316 koib_tx_t, tx_list);
2318 LASSERT (tx->tx_passive_rdma);
2319 LASSERT (tx->tx_passive_rdma_wait);
2321 list_del (&tx->tx_list);
2323 tx->tx_passive_rdma_wait = 0;
2324 done = (tx->tx_sending == 0);
2326 tx->tx_status = -ECONNABORTED;
2328 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2331 koibnal_tx_done (tx);
2333 spin_lock_irqsave (&conn->ibc_lock, flags);
2336 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2338 /* Complete all blocked transmits */
2339 koibnal_check_sends(conn);
2343 koibnal_connd (void *arg)
2346 unsigned long flags;
2352 unsigned long deadline = jiffies;
2354 kportal_daemonize ("koibnal_connd");
2355 kportal_blockallsigs ();
2357 init_waitqueue_entry (&wait, current);
2359 spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2362 if (!list_empty (&koibnal_data.koib_connd_conns)) {
2363 conn = list_entry (koibnal_data.koib_connd_conns.next,
2364 koib_conn_t, ibc_list);
2365 list_del (&conn->ibc_list);
2367 spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2369 switch (conn->ibc_state) {
2370 case OPENIBNAL_CONN_DEATHROW:
2371 LASSERT (conn->ibc_comm_id != TS_IB_CM_COMM_ID_INVALID);
2372 /* Disconnect: conn becomes a zombie in the
2373 * callback and last ref reschedules it
2375 koibnal_terminate_conn(conn);
2376 koibnal_put_conn (conn);
2379 case OPENIBNAL_CONN_ZOMBIE:
2380 koibnal_destroy_conn (conn);
2384 CERROR ("Bad conn %p state: %d\n",
2385 conn, conn->ibc_state);
2389 spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2393 if (!list_empty (&koibnal_data.koib_connd_peers)) {
2394 peer = list_entry (koibnal_data.koib_connd_peers.next,
2395 koib_peer_t, ibp_connd_list);
2397 list_del_init (&peer->ibp_connd_list);
2398 spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2400 koibnal_connect_peer (peer);
2401 koibnal_put_peer (peer);
2403 spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2406 /* shut down and nobody left to reap... */
2407 if (koibnal_data.koib_shutdown &&
2408 atomic_read(&koibnal_data.koib_nconns) == 0)
2411 spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2413 /* careful with the jiffy wrap... */
2414 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2417 int chunk = koibnal_data.koib_peer_hash_size;
2419 /* Time to check for RDMA timeouts on a few more
2420 * peers: I do checks every 'p' seconds on a
2421 * proportion of the peer table and I need to check
2422 * every connection 'n' times within a timeout
2423 * interval, to ensure I detect a timeout on any
2424 * connection within (n+1)/n times the timeout
2427 if (koibnal_tunables.koib_io_timeout > n * p)
2428 chunk = (chunk * n * p) /
2429 koibnal_tunables.koib_io_timeout;
2433 for (i = 0; i < chunk; i++) {
2434 koibnal_check_conns (peer_index);
2435 peer_index = (peer_index + 1) %
2436 koibnal_data.koib_peer_hash_size;
2442 koibnal_data.koib_connd_waketime = jiffies + timeout;
2444 set_current_state (TASK_INTERRUPTIBLE);
2445 add_wait_queue (&koibnal_data.koib_connd_waitq, &wait);
2447 if (!koibnal_data.koib_shutdown &&
2448 list_empty (&koibnal_data.koib_connd_conns) &&
2449 list_empty (&koibnal_data.koib_connd_peers))
2450 schedule_timeout (timeout);
2452 set_current_state (TASK_RUNNING);
2453 remove_wait_queue (&koibnal_data.koib_connd_waitq, &wait);
2455 spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
2458 spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
2460 koibnal_thread_fini ();
2465 koibnal_scheduler(void *arg)
2467 long id = (long)arg;
2471 unsigned long flags;
2476 snprintf(name, sizeof(name), "koibnal_sd_%02ld", id);
2477 kportal_daemonize(name);
2478 kportal_blockallsigs();
2480 spin_lock_irqsave(&koibnal_data.koib_sched_lock, flags);
2485 while (!list_empty(&koibnal_data.koib_sched_txq)) {
2486 tx = list_entry(koibnal_data.koib_sched_txq.next,
2487 koib_tx_t, tx_list);
2488 list_del(&tx->tx_list);
2489 spin_unlock_irqrestore(&koibnal_data.koib_sched_lock,
2491 koibnal_tx_done(tx);
2493 spin_lock_irqsave(&koibnal_data.koib_sched_lock,
2497 if (!list_empty(&koibnal_data.koib_sched_rxq)) {
2498 rx = list_entry(koibnal_data.koib_sched_rxq.next,
2499 koib_rx_t, rx_list);
2500 list_del(&rx->rx_list);
2501 spin_unlock_irqrestore(&koibnal_data.koib_sched_lock,
2507 spin_lock_irqsave(&koibnal_data.koib_sched_lock,
2511 /* shut down and no receives to complete... */
2512 if (koibnal_data.koib_shutdown &&
2513 atomic_read(&koibnal_data.koib_nconns) == 0)
2516 /* nothing to do or hogging CPU */
2517 if (!did_something || counter++ == OPENIBNAL_RESCHED) {
2518 spin_unlock_irqrestore(&koibnal_data.koib_sched_lock,
2522 if (!did_something) {
2523 rc = wait_event_interruptible(
2524 koibnal_data.koib_sched_waitq,
2525 !list_empty(&koibnal_data.koib_sched_txq) ||
2526 !list_empty(&koibnal_data.koib_sched_rxq) ||
2527 (koibnal_data.koib_shutdown &&
2528 atomic_read (&koibnal_data.koib_nconns) == 0));
2533 spin_lock_irqsave(&koibnal_data.koib_sched_lock,
2538 spin_unlock_irqrestore(&koibnal_data.koib_sched_lock, flags);
2540 koibnal_thread_fini();
2545 lib_nal_t koibnal_lib = {
2546 libnal_data: &koibnal_data, /* NAL private data */
2547 libnal_send: koibnal_send,
2548 libnal_send_pages: koibnal_send_pages,
2549 libnal_recv: koibnal_recv,
2550 libnal_recv_pages: koibnal_recv_pages,
2551 libnal_dist: koibnal_dist