X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fopeniblnd%2Fopeniblnd_cb.c;h=20ce962dfcdbbb14944dbef97253ed881065c70d;hp=34d465f83675fcd2053541fac2bc3cbc9c1cd849;hb=ebb833a501e0141053bf942efadf8b4bfde92000;hpb=ea06e3ca93ee7eb8504677cf7eb2afffe6574d56 diff --git a/lnet/klnds/openiblnd/openiblnd_cb.c b/lnet/klnds/openiblnd/openiblnd_cb.c index 34d465f..20ce962 100644 --- a/lnet/klnds/openiblnd/openiblnd_cb.c +++ b/lnet/klnds/openiblnd/openiblnd_cb.c @@ -1,27 +1,44 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2004 Cluster File Systems, Inc. - * Author: Eric Barton + * GPL HEADER START * - * This file is part of Lustre, http://www.lustre.org. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lnet/klnds/openiblnd/openiblnd_cb.c + * + * Author: Eric Barton */ -#include "openibnal.h" +#include "openiblnd.h" /* * LIB functions follow @@ -43,7 +60,7 @@ kibnal_schedule_tx_done (kib_tx_t *tx) void kibnal_tx_done (kib_tx_t *tx) { - ptl_err_t ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL; + lnet_msg_t *lntmsg[2]; unsigned long flags; int i; int rc; @@ -51,6 +68,12 @@ kibnal_tx_done (kib_tx_t *tx) LASSERT (tx->tx_sending == 0); /* mustn't be awaiting callback */ LASSERT (!tx->tx_passive_rdma_wait); /* mustn't be awaiting RDMA */ + if (in_interrupt()) { + /* can't deregister memory/flush FMAs/finalize in IRQ context... */ + kibnal_schedule_tx_done(tx); + return; + } + switch (tx->tx_mapped) { default: LBUG(); @@ -59,11 +82,6 @@ kibnal_tx_done (kib_tx_t *tx) break; case KIB_TX_MAPPED: - if (in_interrupt()) { - /* can't deregister memory in IRQ context... */ - kibnal_schedule_tx_done(tx); - return; - } rc = ib_memory_deregister(tx->tx_md.md_handle.mr); LASSERT (rc == 0); tx->tx_mapped = KIB_TX_UNMAPPED; @@ -71,33 +89,27 @@ kibnal_tx_done (kib_tx_t *tx) #if IBNAL_FMR case KIB_TX_MAPPED_FMR: - if (in_interrupt() && tx->tx_status != 0) { - /* can't flush FMRs in IRQ context... */ - kibnal_schedule_tx_done(tx); - return; - } - rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr); LASSERT (rc == 0); +#ifndef USING_TSAPI + /* Somewhat belt-and-braces since the tx's conn has closed if + * this was a passive RDMA waiting to complete... */ if (tx->tx_status != 0) ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool); +#endif tx->tx_mapped = KIB_TX_UNMAPPED; break; #endif } - for (i = 0; i < 2; i++) { - /* tx may have up to 2 libmsgs to finalise */ - if (tx->tx_libmsg[i] == NULL) - continue; + /* tx may have up to 2 ptlmsgs to finalise */ + lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL; + lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL; + rc = tx->tx_status; - lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc); - tx->tx_libmsg[i] = NULL; - } - if (tx->tx_conn != NULL) { - kibnal_put_conn (tx->tx_conn); + kibnal_conn_decref(tx->tx_conn); tx->tx_conn = NULL; } @@ -107,88 +119,53 @@ kibnal_tx_done (kib_tx_t *tx) spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags); - if (tx->tx_isnblk) { - list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs); - } else { - list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs); - wake_up (&kibnal_data.kib_idle_tx_waitq); - } + list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs); spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags); + + /* delay finalize until my descs have been freed */ + for (i = 0; i < 2; i++) { + if (lntmsg[i] == NULL) + continue; + + lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc); + } } kib_tx_t * -kibnal_get_idle_tx (int may_block) +kibnal_get_idle_tx (void) { unsigned long flags; - kib_tx_t *tx = NULL; + kib_tx_t *tx; - for (;;) { - spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags); - - /* "normal" descriptor is free */ - if (!list_empty (&kibnal_data.kib_idle_txs)) { - tx = list_entry (kibnal_data.kib_idle_txs.next, - kib_tx_t, tx_list); - break; - } - - if (!may_block) { - /* may dip into reserve pool */ - if (list_empty (&kibnal_data.kib_idle_nblk_txs)) { - CERROR ("reserved tx desc pool exhausted\n"); - break; - } - - tx = list_entry (kibnal_data.kib_idle_nblk_txs.next, - kib_tx_t, tx_list); - break; - } + spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags); - /* block for idle tx */ + if (list_empty (&kibnal_data.kib_idle_txs)) { spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags); - - wait_event (kibnal_data.kib_idle_tx_waitq, - !list_empty (&kibnal_data.kib_idle_txs) || - kibnal_data.kib_shutdown); + return NULL; } - if (tx != NULL) { - list_del (&tx->tx_list); + tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list); + list_del (&tx->tx_list); - /* Allocate a new passive RDMA completion cookie. It might - * not be needed, but we've got a lock right now and we're - * unlikely to wrap... */ - tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++; - - LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED); - LASSERT (tx->tx_nsp == 0); - LASSERT (tx->tx_sending == 0); - LASSERT (tx->tx_status == 0); - LASSERT (tx->tx_conn == NULL); - LASSERT (!tx->tx_passive_rdma); - LASSERT (!tx->tx_passive_rdma_wait); - LASSERT (tx->tx_libmsg[0] == NULL); - LASSERT (tx->tx_libmsg[1] == NULL); - } + /* Allocate a new passive RDMA completion cookie. It might not be + * needed, but we've got a lock right now and we're unlikely to + * wrap... */ + tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++; spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags); - - return (tx); -} - -int -kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist) -{ - /* I would guess that if kibnal_get_peer (nid) == NULL, - and we're not routing, then 'nid' is very distant :) */ - if ( nal->libnal_ni.ni_pid.nid == nid ) { - *dist = 0; - } else { - *dist = 1; - } - return 0; + LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED); + LASSERT (tx->tx_nsp == 0); + LASSERT (tx->tx_sending == 0); + LASSERT (tx->tx_status == 0); + LASSERT (tx->tx_conn == NULL); + LASSERT (!tx->tx_passive_rdma); + LASSERT (!tx->tx_passive_rdma_wait); + LASSERT (tx->tx_lntmsg[0] == NULL); + LASSERT (tx->tx_lntmsg[1] == NULL); + + return tx; } void @@ -215,6 +192,8 @@ kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status) CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status); + /* XXX Set mlength of reply here */ + tx->tx_status = status; tx->tx_passive_rdma_wait = 0; idle = (tx->tx_sending == 0); @@ -233,17 +212,20 @@ kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status) spin_unlock_irqrestore (&conn->ibc_lock, flags); - CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n", - cookie, conn->ibc_peer->ibp_nid); + CERROR ("Unmatched (late?) RDMA completion "LPX64" from %s\n", + cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid)); } void -kibnal_post_rx (kib_rx_t *rx, int do_credits) +kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit) { kib_conn_t *conn = rx->rx_conn; int rc; unsigned long flags; + LASSERT(!rsrvd_credit || + conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD); + rx->rx_gl = (struct ib_gather_scatter) { .address = rx->rx_vaddr, .length = IBNAL_MSG_SIZE, @@ -259,19 +241,24 @@ kibnal_post_rx (kib_rx_t *rx, int do_credits) }; LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED); - LASSERT (!rx->rx_posted); - rx->rx_posted = 1; + LASSERT (rx->rx_nob >= 0); /* not posted */ + rx->rx_nob = -1; /* is now */ mb(); if (conn->ibc_state != IBNAL_CONN_ESTABLISHED) rc = -ECONNABORTED; else - rc = ib_receive (conn->ibc_qp, &rx->rx_sp, 1); + rc = kibnal_ib_receive(conn->ibc_qp, &rx->rx_sp); if (rc == 0) { - if (do_credits) { + if (credit || rsrvd_credit) { spin_lock_irqsave(&conn->ibc_lock, flags); - conn->ibc_outstanding_credits++; + + if (credit) + conn->ibc_outstanding_credits++; + if (rsrvd_credit) + conn->ibc_reserved_credits++; + spin_unlock_irqrestore(&conn->ibc_lock, flags); kibnal_check_sends(conn); @@ -280,16 +267,16 @@ kibnal_post_rx (kib_rx_t *rx, int do_credits) } if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) { - CERROR ("Error posting receive -> "LPX64": %d\n", - conn->ibc_peer->ibp_nid, rc); + CERROR ("Error posting receive -> %s: %d\n", + libcfs_nid2str(conn->ibc_peer->ibp_nid), rc); kibnal_close_conn (rx->rx_conn, rc); } else { - CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n", - conn->ibc_peer->ibp_nid, rc); + CDEBUG (D_NET, "Error posting receive -> %s: %d\n", + libcfs_nid2str(conn->ibc_peer->ibp_nid), rc); } /* Drop rx's ref */ - kibnal_put_conn (conn); + kibnal_conn_decref(conn); } void @@ -301,10 +288,11 @@ kibnal_rx_callback (struct ib_cq_entry *e) int credits; unsigned long flags; int rc; + int err = -ECONNABORTED; CDEBUG (D_NET, "rx %p conn %p\n", rx, conn); - LASSERT (rx->rx_posted); - rx->rx_posted = 0; + LASSERT (rx->rx_nob < 0); /* was posted */ + rx->rx_nob = 0; /* isn't now */ mb(); /* receives complete with error in any case after we've started @@ -316,24 +304,29 @@ kibnal_rx_callback (struct ib_cq_entry *e) LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED); if (e->status != IB_COMPLETION_STATUS_SUCCESS) { - CERROR("Rx from "LPX64" failed: %d\n", - conn->ibc_peer->ibp_nid, e->status); + CERROR("Rx from %s failed: %d\n", + libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status); goto failed; } - rc = kibnal_unpack_msg(msg, e->bytes_transferred); + LASSERT (e->bytes_transferred >= 0); + rx->rx_nob = e->bytes_transferred; + mb(); + + rc = kibnal_unpack_msg(msg, conn->ibc_version, rx->rx_nob); if (rc != 0) { - CERROR ("Error %d unpacking rx from "LPX64"\n", - rc, conn->ibc_peer->ibp_nid); + CERROR ("Error %d unpacking rx from %s\n", + rc, libcfs_nid2str(conn->ibc_peer->ibp_nid)); goto failed; } - if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid || + if (conn->ibc_peer->ibp_nid != msg->ibm_srcnid || + kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid || msg->ibm_srcstamp != conn->ibc_incarnation || - msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid || msg->ibm_dststamp != kibnal_data.kib_incarnation) { - CERROR ("Stale rx from "LPX64"\n", - conn->ibc_peer->ibp_nid); + CERROR ("Stale rx from %s\n", + libcfs_nid2str(conn->ibc_peer->ibp_nid)); + err = -ESTALE; goto failed; } @@ -349,7 +342,7 @@ kibnal_rx_callback (struct ib_cq_entry *e) switch (msg->ibm_type) { case IBNAL_MSG_NOOP: - kibnal_post_rx (rx, 1); + kibnal_post_rx (rx, 1, 0); return; case IBNAL_MSG_IMMEDIATE: @@ -373,15 +366,23 @@ kibnal_rx_callback (struct ib_cq_entry *e) kibnal_complete_passive_rdma (conn, msg->ibm_u.completion.ibcm_cookie, msg->ibm_u.completion.ibcm_status); - kibnal_post_rx (rx, 1); + + if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) { + kibnal_post_rx (rx, 1, 0); + } else { + /* this reply buffer was pre-reserved */ + kibnal_post_rx (rx, 0, 1); + } return; default: - CERROR ("Bad msg type %x from "LPX64"\n", - msg->ibm_type, conn->ibc_peer->ibp_nid); + CERROR ("Bad msg type %x from %s\n", + msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid)); goto failed; } + kibnal_peer_alive(conn->ibc_peer); + /* schedule for kibnal_rx() in thread context */ spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags); @@ -393,61 +394,43 @@ kibnal_rx_callback (struct ib_cq_entry *e) failed: CDEBUG(D_NET, "rx %p conn %p\n", rx, conn); - kibnal_close_conn(conn, -ECONNABORTED); + kibnal_close_conn(conn, err); /* Don't re-post rx & drop its ref on conn */ - kibnal_put_conn(conn); + kibnal_conn_decref(conn); } void kibnal_rx (kib_rx_t *rx) { + int rc = 0; kib_msg_t *msg = rx->rx_msg; - /* Clear flag so I can detect if I've sent an RDMA completion */ - rx->rx_rdma = 0; - switch (msg->ibm_type) { case IBNAL_MSG_GET_RDMA: - lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx); - /* If the incoming get was matched, I'll have initiated the - * RDMA and the completion message... */ - if (rx->rx_rdma) - break; - - /* Otherwise, I'll send a failed completion now to prevent - * the peer's GET blocking for the full timeout. */ - CERROR ("Completing unmatched RDMA GET from "LPX64"\n", - rx->rx_conn->ibc_peer->ibp_nid); - kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO, - rx, NULL, 0, NULL, NULL, 0, 0); + rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr, + msg->ibm_srcnid, rx, 1); break; case IBNAL_MSG_PUT_RDMA: - lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx); - if (rx->rx_rdma) - break; - /* This is most unusual, since even if lib_parse() didn't - * match anything, it should have asked us to read (and - * discard) the payload. The portals header must be - * inconsistent with this message type, so it's the - * sender's fault for sending garbage and she can time - * herself out... */ - CERROR ("Uncompleted RMDA PUT from "LPX64"\n", - rx->rx_conn->ibc_peer->ibp_nid); + rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr, + msg->ibm_srcnid, rx, 1); break; case IBNAL_MSG_IMMEDIATE: - lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx); - LASSERT (!rx->rx_rdma); + rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr, + msg->ibm_srcnid, rx, 0); break; - + default: LBUG(); break; } - kibnal_post_rx (rx, 1); + if (rc < 0) { + kibnal_close_conn(rx->rx_conn, rc); + kibnal_post_rx (rx, 1, 0); + } } #if 0 @@ -459,7 +442,7 @@ kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp) if (vaddr >= VMALLOC_START && vaddr < VMALLOC_END) page = vmalloc_to_page ((void *)vaddr); -#if CONFIG_HIGHMEM +#ifdef CONFIG_HIGHMEM else if (vaddr >= PKMAP_BASE && vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) page = vmalloc_to_page ((void *)vaddr); @@ -472,14 +455,14 @@ kibnal_kvaddr_to_phys (unsigned long vaddr, __u64 *physp) !VALID_PAGE (page)) return (-EFAULT); - *physp = kibnal_page2phys(page) + (vaddr & (PAGE_SIZE - 1)); + *physp = lnet_page2phys(page) + (vaddr & (PAGE_SIZE - 1)); return (0); } #endif int -kibnal_map_iov (kib_tx_t *tx, enum ib_memory_access access, - int niov, struct iovec *iov, int offset, int nob) +kibnal_map_iov (kib_tx_t *tx, int access, + unsigned int niov, struct iovec *iov, int offset, int nob) { void *vaddr; @@ -521,8 +504,8 @@ kibnal_map_iov (kib_tx_t *tx, enum ib_memory_access access, } int -kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access, - int nkiov, ptl_kiov_t *kiov, +kibnal_map_kiov (kib_tx_t *tx, int access, + int nkiov, lnet_kiov_t *kiov, int offset, int nob) { #if IBNAL_FMR @@ -552,7 +535,7 @@ kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access, } phys_size = nkiov * sizeof (*phys); - PORTAL_ALLOC(phys, phys_size); + LIBCFS_ALLOC(phys, phys_size); if (phys == NULL) { CERROR ("Can't allocate tmp phys\n"); return (-ENOMEM); @@ -560,9 +543,9 @@ kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access, page_offset = kiov->kiov_offset + offset; #if IBNAL_FMR - phys[0] = kibnal_page2phys(kiov->kiov_page); + phys[0] = lnet_page2phys(kiov->kiov_page); #else - phys[0].address = kibnal_page2phys(kiov->kiov_page); + phys[0].address = lnet_page2phys(kiov->kiov_page); phys[0].size = PAGE_SIZE; #endif nphys = 1; @@ -592,7 +575,7 @@ kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access, goto out; } - if (nphys == PTL_MD_MAX_IOV) { + if (nphys == LNET_MAX_IOV) { CERROR ("payload too big (%d)\n", nphys); rc = -EMSGSIZE; goto out; @@ -600,9 +583,9 @@ kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access, LASSERT (nphys * sizeof (*phys) < phys_size); #if IBNAL_FMR - phys[nphys] = kibnal_page2phys(kiov->kiov_page); + phys[nphys] = lnet_page2phys(kiov->kiov_page); #else - phys[nphys].address = kibnal_page2phys(kiov->kiov_page); + phys[nphys].address = lnet_page2phys(kiov->kiov_page); phys[nphys].size = PAGE_SIZE; #endif nphys++; @@ -640,7 +623,7 @@ kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access, } out: - PORTAL_FREE(phys, phys_size); + LIBCFS_FREE(phys, phys_size); return (rc); } @@ -664,31 +647,57 @@ kibnal_check_sends (kib_conn_t *conn) kib_tx_t *tx; int rc; int i; + int consume_credit; int done; int nwork; spin_lock_irqsave (&conn->ibc_lock, flags); - LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE); + LASSERT (conn->ibc_nsends_posted <= IBNAL_RX_MSGS); + LASSERT (conn->ibc_reserved_credits >= 0); + + while (conn->ibc_reserved_credits > 0 && + !list_empty(&conn->ibc_tx_queue_rsrvd)) { + LASSERT (conn->ibc_version != + IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD); + tx = list_entry(conn->ibc_tx_queue_rsrvd.next, + kib_tx_t, tx_list); + list_del(&tx->tx_list); + list_add_tail(&tx->tx_list, &conn->ibc_tx_queue); + conn->ibc_reserved_credits--; + } if (list_empty(&conn->ibc_tx_queue) && - conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) { + list_empty(&conn->ibc_tx_queue_nocred) && + (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER || + kibnal_send_keepalive(conn))) { spin_unlock_irqrestore(&conn->ibc_lock, flags); - tx = kibnal_get_idle_tx(0); /* don't block */ + tx = kibnal_get_idle_tx(); if (tx != NULL) kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0); spin_lock_irqsave(&conn->ibc_lock, flags); - if (tx != NULL) { - atomic_inc(&conn->ibc_refcount); + if (tx != NULL) kibnal_queue_tx_locked(tx, conn); - } } - while (!list_empty (&conn->ibc_tx_queue)) { - tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list); + for (;;) { + if (!list_empty(&conn->ibc_tx_queue_nocred)) { + LASSERT (conn->ibc_version != + IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD); + tx = list_entry(conn->ibc_tx_queue_nocred.next, + kib_tx_t, tx_list); + consume_credit = 0; + } else if (!list_empty (&conn->ibc_tx_queue)) { + tx = list_entry (conn->ibc_tx_queue.next, + kib_tx_t, tx_list); + consume_credit = 1; + } else { + /* nothing waiting */ + break; + } /* We rely on this for QP sizing */ LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2); @@ -701,21 +710,25 @@ kibnal_check_sends (kib_conn_t *conn) /* Not on ibc_rdma_queue */ LASSERT (!tx->tx_passive_rdma_wait); - if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE) + if (conn->ibc_nsends_posted == IBNAL_RX_MSGS) break; - if (conn->ibc_credits == 0) /* no credits */ - break; + if (consume_credit) { + if (conn->ibc_credits == 0) /* no credits */ + break; + + if (conn->ibc_credits == 1 && /* last credit reserved for */ + conn->ibc_outstanding_credits == 0) /* giving back credits */ + break; + } - if (conn->ibc_credits == 1 && /* last credit reserved for */ - conn->ibc_outstanding_credits == 0) /* giving back credits */ - break; - list_del (&tx->tx_list); if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP && (!list_empty(&conn->ibc_tx_queue) || - conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) { + !list_empty(&conn->ibc_tx_queue_nocred) || + (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER && + !kibnal_send_keepalive(conn)))) { /* redundant NOOP */ spin_unlock_irqrestore(&conn->ibc_lock, flags); kibnal_tx_done(tx); @@ -723,12 +736,14 @@ kibnal_check_sends (kib_conn_t *conn) continue; } - kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits, + kibnal_pack_msg(tx->tx_msg, conn->ibc_version, + conn->ibc_outstanding_credits, conn->ibc_peer->ibp_nid, conn->ibc_incarnation); conn->ibc_outstanding_credits = 0; conn->ibc_nsends_posted++; - conn->ibc_credits--; + if (consume_credit) + conn->ibc_credits--; tx->tx_sending = tx->tx_nsp; tx->tx_passive_rdma_wait = tx->tx_passive_rdma; @@ -747,19 +762,22 @@ kibnal_check_sends (kib_conn_t *conn) tx->tx_status = 0; /* Driver only accepts 1 item at a time */ for (i = 0; i < tx->tx_nsp; i++) { - rc = ib_send (conn->ibc_qp, &tx->tx_sp[i], 1); + rc = kibnal_ib_send(conn->ibc_qp, &tx->tx_sp[i]); if (rc != 0) break; nwork++; } } + conn->ibc_last_send = jiffies; + spin_lock_irqsave (&conn->ibc_lock, flags); if (rc != 0) { /* NB credits are transferred in the actual * message, which can only be the last work item */ conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits; - conn->ibc_credits++; + if (consume_credit) + conn->ibc_credits++; conn->ibc_nsends_posted--; tx->tx_status = rc; @@ -773,11 +791,11 @@ kibnal_check_sends (kib_conn_t *conn) spin_unlock_irqrestore (&conn->ibc_lock, flags); if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) - CERROR ("Error %d posting transmit to "LPX64"\n", - rc, conn->ibc_peer->ibp_nid); + CERROR ("Error %d posting transmit to %s\n", + rc, libcfs_nid2str(conn->ibc_peer->ibp_nid)); else - CDEBUG (D_NET, "Error %d posting transmit to " - LPX64"\n", rc, conn->ibc_peer->ibp_nid); + CDEBUG (D_NET, "Error %d posting transmit to %s\n", + rc, libcfs_nid2str(conn->ibc_peer->ibp_nid)); kibnal_close_conn (conn, rc); @@ -820,10 +838,7 @@ kibnal_tx_callback (struct ib_cq_entry *e) if (idle) list_del(&tx->tx_list); - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_inc (&conn->ibc_refcount); + kibnal_conn_addref(conn); if (tx->tx_sending == 0) conn->ibc_nsends_posted--; @@ -838,19 +853,20 @@ kibnal_tx_callback (struct ib_cq_entry *e) kibnal_tx_done (tx); if (e->status != IB_COMPLETION_STATUS_SUCCESS) { - CERROR ("Tx completion to "LPX64" failed: %d\n", - conn->ibc_peer->ibp_nid, e->status); + CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", + libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status); kibnal_close_conn (conn, -ENETDOWN); } else { + kibnal_peer_alive(conn->ibc_peer); /* can I shovel some more sends out the door? */ kibnal_check_sends(conn); } - kibnal_put_conn (conn); + kibnal_conn_decref(conn); } void -kibnal_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg) +kibnal_callback (ib_cq_t *cq, struct ib_cq_entry *e, void *arg) { if (kibnal_wreqid_is_rx(e->work_request_id)) kibnal_rx_callback (e); @@ -916,11 +932,31 @@ kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn) } void -kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid) +kibnal_schedule_active_connect_locked (kib_peer_t *peer) +{ + /* Called with exclusive kib_global_lock */ + + peer->ibp_connecting++; + kibnal_peer_addref(peer); /* extra ref for connd */ + + spin_lock (&kibnal_data.kib_connd_lock); + + LASSERT (list_empty(&peer->ibp_connd_list)); + list_add_tail (&peer->ibp_connd_list, + &kibnal_data.kib_connd_peers); + wake_up (&kibnal_data.kib_connd_waitq); + + spin_unlock (&kibnal_data.kib_connd_lock); +} + +void +kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid) { unsigned long flags; kib_peer_t *peer; kib_conn_t *conn; + int retry; + int rc; rwlock_t *g_lock = &kibnal_data.kib_global_lock; /* If I get here, I've committed to send, so I complete the tx with @@ -929,71 +965,72 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid) LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ LASSERT (tx->tx_nsp > 0); /* work items have been set up */ - read_lock (g_lock); + for (retry = 0; ; retry = 1) { + read_lock_irqsave(g_lock, flags); - peer = kibnal_find_peer_locked (nid); - if (peer == NULL) { - read_unlock (g_lock); - tx->tx_status = -EHOSTUNREACH; - kibnal_tx_done (tx); - return; - } - - conn = kibnal_find_conn_locked (peer); - if (conn != NULL) { - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */ - read_unlock (g_lock); + peer = kibnal_find_peer_locked (nid); + if (peer != NULL) { + conn = kibnal_find_conn_locked (peer); + if (conn != NULL) { + kibnal_conn_addref(conn); /* 1 ref for me...*/ + read_unlock_irqrestore(g_lock, flags); - kibnal_queue_tx (tx, conn); - return; - } - - /* Making one or more connections; I'll need a write lock... */ - read_unlock (g_lock); - write_lock_irqsave (g_lock, flags); + kibnal_queue_tx (tx, conn); + kibnal_conn_decref(conn); /* ...until here */ + return; + } + } + + /* Making one or more connections; I'll need a write lock... */ + read_unlock(g_lock); + write_lock(g_lock); - peer = kibnal_find_peer_locked (nid); - if (peer == NULL) { + peer = kibnal_find_peer_locked (nid); + if (peer != NULL) + break; + write_unlock_irqrestore (g_lock, flags); - tx->tx_status = -EHOSTUNREACH; - kibnal_tx_done (tx); - return; + + if (retry) { + CERROR("Can't find peer %s\n", libcfs_nid2str(nid)); + tx->tx_status = -EHOSTUNREACH; + kibnal_tx_done (tx); + return; + } + + rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid), + lnet_acceptor_port()); + if (rc != 0) { + CERROR("Can't add peer %s: %d\n", + libcfs_nid2str(nid), rc); + tx->tx_status = rc; + kibnal_tx_done(tx); + return; + } } conn = kibnal_find_conn_locked (peer); if (conn != NULL) { /* Connection exists; queue message on it */ - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */ + kibnal_conn_addref(conn); /* +1 ref from me... */ write_unlock_irqrestore (g_lock, flags); kibnal_queue_tx (tx, conn); + kibnal_conn_decref(conn); /* ...until here */ return; } - if (peer->ibp_connecting == 0) { - if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) { + if (peer->ibp_connecting == 0 && + peer->ibp_accepting == 0) { + if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */ + time_after_eq(jiffies, peer->ibp_reconnect_time))) { write_unlock_irqrestore (g_lock, flags); tx->tx_status = -EHOSTUNREACH; kibnal_tx_done (tx); return; } - peer->ibp_connecting = 1; - atomic_inc (&peer->ibp_refcount); /* extra ref for connd */ - - spin_lock (&kibnal_data.kib_connd_lock); - - list_add_tail (&peer->ibp_connd_list, - &kibnal_data.kib_connd_peers); - wake_up (&kibnal_data.kib_connd_waitq); - - spin_unlock (&kibnal_data.kib_connd_lock); + kibnal_schedule_active_connect_locked(peer); } /* A connection is being established; queue the message... */ @@ -1002,11 +1039,27 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid) write_unlock_irqrestore (g_lock, flags); } -ptl_err_t -kibnal_start_passive_rdma (int type, ptl_nid_t nid, - lib_msg_t *libmsg, ptl_hdr_t *hdr) +void +kibnal_txlist_done (struct list_head *txlist, int status) +{ + kib_tx_t *tx; + + while (!list_empty(txlist)) { + tx = list_entry (txlist->next, kib_tx_t, tx_list); + + list_del (&tx->tx_list); + /* complete now */ + tx->tx_status = status; + kibnal_tx_done (tx); + } +} + +int +kibnal_start_passive_rdma (int type, lnet_msg_t *lntmsg, + int niov, struct iovec *iov, lnet_kiov_t *kiov, + int nob) { - int nob = libmsg->md->length; + lnet_nid_t nid = lntmsg->msg_target.nid; kib_tx_t *tx; kib_msg_t *ibmsg; int rc; @@ -1024,32 +1077,33 @@ kibnal_start_passive_rdma (int type, ptl_nid_t nid, IB_ACCESS_LOCAL_WRITE; } - tx = kibnal_get_idle_tx (1); /* May block; caller is an app thread */ - LASSERT (tx != NULL); + tx = kibnal_get_idle_tx (); + if (tx == NULL) { + CERROR("Can't allocate %s txd for %s\n", + (type == IBNAL_MSG_PUT_RDMA) ? "PUT/REPLY" : "GET", + libcfs_nid2str(nid)); + return -ENOMEM; + } - if ((libmsg->md->options & PTL_MD_KIOV) == 0) - rc = kibnal_map_iov (tx, access, - libmsg->md->md_niov, - libmsg->md->md_iov.iov, - 0, nob); + + if (iov != NULL) + rc = kibnal_map_iov (tx, access, niov, iov, 0, nob); else - rc = kibnal_map_kiov (tx, access, - libmsg->md->md_niov, - libmsg->md->md_iov.kiov, - 0, nob); + rc = kibnal_map_kiov (tx, access, niov, kiov, 0, nob); if (rc != 0) { - CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc); + CERROR ("Can't map RDMA for %s: %d\n", + libcfs_nid2str(nid), rc); goto failed; } if (type == IBNAL_MSG_GET_RDMA) { /* reply gets finalized when tx completes */ - tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, - nid, libmsg); - if (tx->tx_libmsg[1] == NULL) { - CERROR ("Can't create reply for GET -> "LPX64"\n", - nid); + tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni, + lntmsg); + if (tx->tx_lntmsg[1] == NULL) { + CERROR ("Can't create reply for GET -> %s\n", + libcfs_nid2str(nid)); rc = -ENOMEM; goto failed; } @@ -1059,7 +1113,7 @@ kibnal_start_passive_rdma (int type, ptl_nid_t nid, ibmsg = tx->tx_msg; - ibmsg->ibm_u.rdma.ibrm_hdr = *hdr; + ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr; ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie; ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey; ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr; @@ -1072,24 +1126,24 @@ kibnal_start_passive_rdma (int type, ptl_nid_t nid, tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey, tx->tx_md.md_addr, nob); - /* libmsg gets finalized when tx completes. */ - tx->tx_libmsg[0] = libmsg; + /* lntmsg gets finalized when tx completes. */ + tx->tx_lntmsg[0] = lntmsg; kibnal_launch_tx(tx, nid); - return (PTL_OK); + return (0); failed: tx->tx_status = rc; kibnal_tx_done (tx); - return (PTL_FAIL); + return (-EIO); } void kibnal_start_active_rdma (int type, int status, - kib_rx_t *rx, lib_msg_t *libmsg, - unsigned int niov, - struct iovec *iov, ptl_kiov_t *kiov, - int offset, int nob) + kib_rx_t *rx, lnet_msg_t *lntmsg, + unsigned int niov, + struct iovec *iov, lnet_kiov_t *kiov, + int offset, int nob) { kib_msg_t *rxmsg = rx->rx_msg; kib_msg_t *txmsg; @@ -1113,12 +1167,6 @@ kibnal_start_active_rdma (int type, int status, LASSERT (type == IBNAL_MSG_GET_DONE || type == IBNAL_MSG_PUT_DONE); - /* Flag I'm completing the RDMA. Even if I fail to send the - * completion message, I will have tried my best so further - * attempts shouldn't be tried. */ - LASSERT (!rx->rx_rdma); - rx->rx_rdma = 1; - if (type == IBNAL_MSG_GET_DONE) { access = 0; rdma_op = IB_OP_RDMA_WRITE; @@ -1129,12 +1177,12 @@ kibnal_start_active_rdma (int type, int status, LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA); } - tx = kibnal_get_idle_tx (0); /* Mustn't block */ + tx = kibnal_get_idle_tx (); if (tx == NULL) { - CERROR ("tx descs exhausted on RDMA from "LPX64 + CERROR ("tx descs exhausted on RDMA from %s" " completing locally with failure\n", - rx->rx_conn->ibc_peer->ibp_nid); - lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE); + libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid)); + lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM); return; } LASSERT (tx->tx_nsp == 0); @@ -1152,8 +1200,9 @@ kibnal_start_active_rdma (int type, int status, niov, iov, offset, nob); if (rc != 0) { - CERROR ("Can't map RDMA -> "LPX64": %d\n", - rx->rx_conn->ibc_peer->ibp_nid, rc); + CERROR ("Can't map RDMA -> %s: %d\n", + libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid), + rc); /* We'll skip the RDMA and complete with failure. */ status = rc; nob = 0; @@ -1192,53 +1241,45 @@ kibnal_start_active_rdma (int type, int status, if (status == 0 && nob != 0) { LASSERT (tx->tx_nsp > 1); - /* RDMA: libmsg gets finalized when the tx completes. This + /* RDMA: lntmsg gets finalized when the tx completes. This * is after the completion message has been sent, which in * turn is after the RDMA has finished. */ - tx->tx_libmsg[0] = libmsg; + tx->tx_lntmsg[0] = lntmsg; } else { LASSERT (tx->tx_nsp == 1); /* No RDMA: local completion happens now! */ CDEBUG(D_NET, "No data: immediate completion\n"); - lib_finalize (&kibnal_lib, NULL, libmsg, - status == 0 ? PTL_OK : PTL_FAIL); + lnet_finalize (kibnal_data.kib_ni, lntmsg, + status == 0 ? 0 : -EIO); } - /* +1 ref for this tx... */ - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - rx->rx_conn, rx->rx_conn->ibc_state, - rx->rx_conn->ibc_peer->ibp_nid, - atomic_read (&rx->rx_conn->ibc_refcount)); - atomic_inc (&rx->rx_conn->ibc_refcount); - /* ...and queue it up */ kibnal_queue_tx(tx, rx->rx_conn); } -ptl_err_t -kibnal_sendmsg(lib_nal_t *nal, - void *private, - lib_msg_t *libmsg, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int payload_niov, - struct iovec *payload_iov, - ptl_kiov_t *payload_kiov, - int payload_offset, - int payload_nob) +int +kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) { - kib_msg_t *ibmsg; - kib_tx_t *tx; - int nob; + lnet_hdr_t *hdr = &lntmsg->msg_hdr; + int type = lntmsg->msg_type; + lnet_process_id_t target = lntmsg->msg_target; + int target_is_router = lntmsg->msg_target_is_router; + int routing = lntmsg->msg_routing; + unsigned int payload_niov = lntmsg->msg_niov; + struct iovec *payload_iov = lntmsg->msg_iov; + lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; + unsigned int payload_offset = lntmsg->msg_offset; + unsigned int payload_nob = lntmsg->msg_len; + kib_msg_t *ibmsg; + kib_tx_t *tx; + int nob; /* NB 'private' is different depending on what we're sending.... */ - CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n", - payload_nob, payload_niov, nid , pid); + CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n", + payload_nob, payload_niov, libcfs_id2str(target)); LASSERT (payload_nob == 0 || payload_niov > 0); - LASSERT (payload_niov <= PTL_MD_MAX_IOV); + LASSERT (payload_niov <= LNET_MAX_IOV); /* Thread context if we're sending payload */ LASSERT (!in_interrupt() || payload_niov == 0); @@ -1248,126 +1289,112 @@ kibnal_sendmsg(lib_nal_t *nal, switch (type) { default: LBUG(); - return (PTL_FAIL); + return (-EIO); - case PTL_MSG_REPLY: { - /* reply's 'private' is the incoming receive */ - kib_rx_t *rx = private; - - /* RDMA reply expected? */ - if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) { - kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0, - rx, libmsg, payload_niov, - payload_iov, payload_kiov, - payload_offset, payload_nob); - return (PTL_OK); - } - - /* Incoming message consistent with immediate reply? */ - if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) { - CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n", - nid, rx->rx_msg->ibm_type); - return (PTL_FAIL); - } - - /* Will it fit in a message? */ - nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]); - if (nob >= IBNAL_MSG_SIZE) { - CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n", - nid, payload_nob); - return (PTL_FAIL); - } - break; - } - - case PTL_MSG_GET: - /* might the REPLY message be big enough to need RDMA? */ - nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]); - if (nob > IBNAL_MSG_SIZE) - return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, - nid, libmsg, hdr)); - break; - - case PTL_MSG_ACK: + case LNET_MSG_ACK: LASSERT (payload_nob == 0); break; - case PTL_MSG_PUT: - /* Is the payload big enough to need RDMA? */ + case LNET_MSG_GET: + if (routing || target_is_router) + break; /* send IMMEDIATE */ + + /* is the REPLY message too small for RDMA? */ + nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]); + if (nob <= IBNAL_MSG_SIZE) + break; /* send IMMEDIATE */ + + if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) + return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, + lntmsg->msg_md->md_niov, + lntmsg->msg_md->md_iov.iov, NULL, + lntmsg->msg_md->md_length); + + return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg, + lntmsg->msg_md->md_niov, + NULL, lntmsg->msg_md->md_iov.kiov, + lntmsg->msg_md->md_length); + + case LNET_MSG_REPLY: + case LNET_MSG_PUT: + /* Is the payload small enough not to need RDMA? */ nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]); - if (nob > IBNAL_MSG_SIZE) - return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, - nid, libmsg, hdr)); + if (nob <= IBNAL_MSG_SIZE) + break; /* send IMMEDIATE */ - break; + return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg, + payload_niov, + payload_iov, payload_kiov, + payload_nob); } - tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK || - type == PTL_MSG_REPLY || - in_interrupt())); + /* Send IMMEDIATE */ + + tx = kibnal_get_idle_tx(); if (tx == NULL) { - CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", - type, nid, in_interrupt() ? " (intr)" : ""); - return (PTL_NO_SPACE); + CERROR ("Can't send %d to %s: tx descs exhausted%s\n", + type, libcfs_nid2str(target.nid), + in_interrupt() ? " (intr)" : ""); + return (-ENOMEM); } ibmsg = tx->tx_msg; ibmsg->ibm_u.immediate.ibim_hdr = *hdr; - if (payload_nob > 0) { - if (payload_kiov != NULL) - lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload, - payload_niov, payload_kiov, - payload_offset, payload_nob); - else - lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload, - payload_niov, payload_iov, - payload_offset, payload_nob); - } + if (payload_kiov != NULL) + lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg, + offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), + payload_niov, payload_kiov, + payload_offset, payload_nob); + else + lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg, + offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), + payload_niov, payload_iov, + payload_offset, payload_nob); kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, offsetof(kib_immediate_msg_t, ibim_payload[payload_nob])); - /* libmsg gets finalized when tx completes */ - tx->tx_libmsg[0] = libmsg; + /* lntmsg gets finalized when tx completes */ + tx->tx_lntmsg[0] = lntmsg; - kibnal_launch_tx(tx, nid); - return (PTL_OK); -} - -ptl_err_t -kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int payload_niov, struct iovec *payload_iov, - size_t payload_offset, size_t payload_len) -{ - return (kibnal_sendmsg(nal, private, cookie, - hdr, type, nid, pid, - payload_niov, payload_iov, NULL, - payload_offset, payload_len)); + kibnal_launch_tx(tx, target.nid); + return (0); } -ptl_err_t -kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int payload_niov, ptl_kiov_t *payload_kiov, - size_t payload_offset, size_t payload_len) +int +kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, + void **new_private) { - return (kibnal_sendmsg(nal, private, cookie, - hdr, type, nid, pid, - payload_niov, NULL, payload_kiov, - payload_offset, payload_len)); + kib_rx_t *rx = private; + kib_conn_t *conn = rx->rx_conn; + + if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) { + /* Can't block if RDMA completions need normal credits */ + LCONSOLE_ERROR_MSG(0x12a, + "Dropping message from %s: no buffers free. " + "%s is running an old version of LNET that may " + "deadlock if messages wait for buffers)\n", + libcfs_nid2str(conn->ibc_peer->ibp_nid), + libcfs_nid2str(conn->ibc_peer->ibp_nid)); + return -EDEADLK; + } + + *new_private = private; + return 0; } -ptl_err_t -kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg, - unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, - int offset, int mlen, int rlen) +int +kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, + int delayed, unsigned int niov, + struct iovec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int mlen, unsigned int rlen) { kib_rx_t *rx = private; kib_msg_t *rxmsg = rx->rx_msg; int msg_nob; + int rc = 0; LASSERT (mlen <= rlen); LASSERT (!in_interrupt ()); @@ -1377,59 +1404,58 @@ kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg, switch (rxmsg->ibm_type) { default: LBUG(); - return (PTL_FAIL); - + case IBNAL_MSG_IMMEDIATE: msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]); - if (msg_nob > IBNAL_MSG_SIZE) { - CERROR ("Immediate message from "LPX64" too big: %d\n", - rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen); - return (PTL_FAIL); + if (msg_nob > rx->rx_nob) { + CERROR ("Immediate message from %s too big: %d(%d)\n", + libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid), + msg_nob, rx->rx_nob); + rc = -EPROTO; + break; } if (kiov != NULL) - lib_copy_buf2kiov(niov, kiov, offset, - rxmsg->ibm_u.immediate.ibim_payload, - mlen); + lnet_copy_flat2kiov( + niov, kiov, offset, + IBNAL_MSG_SIZE, rxmsg, + offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), + mlen); else - lib_copy_buf2iov(niov, iov, offset, - rxmsg->ibm_u.immediate.ibim_payload, - mlen); + lnet_copy_flat2iov( + niov, iov, offset, + IBNAL_MSG_SIZE, rxmsg, + offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), + mlen); - lib_finalize (nal, NULL, libmsg, PTL_OK); - return (PTL_OK); + lnet_finalize (ni, lntmsg, 0); + break; case IBNAL_MSG_GET_RDMA: - /* We get called here just to discard any junk after the - * GET hdr. */ - LASSERT (libmsg == NULL); - lib_finalize (nal, NULL, libmsg, PTL_OK); - return (PTL_OK); + if (lntmsg != NULL) { + /* GET matched: RDMA lntmsg's payload */ + kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0, + rx, lntmsg, + lntmsg->msg_niov, + lntmsg->msg_iov, + lntmsg->msg_kiov, + lntmsg->msg_offset, + lntmsg->msg_len); + } else { + /* GET didn't match anything */ + kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA, + rx, NULL, 0, NULL, NULL, 0, 0); + } + break; case IBNAL_MSG_PUT_RDMA: - kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, - rx, libmsg, + kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg, niov, iov, kiov, offset, mlen); - return (PTL_OK); + break; } -} - -ptl_err_t -kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, - size_t offset, size_t mlen, size_t rlen) -{ - return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL, - offset, mlen, rlen)); -} -ptl_err_t -kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, ptl_kiov_t *kiov, - size_t offset, size_t mlen, size_t rlen) -{ - return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov, - offset, mlen, rlen)); + kibnal_post_rx(rx, 1, 0); + return rc; } int @@ -1451,6 +1477,40 @@ kibnal_thread_fini (void) } void +kibnal_peer_alive (kib_peer_t *peer) +{ + /* This is racy, but everyone's only writing cfs_time_current() */ + peer->ibp_last_alive = cfs_time_current(); + mb(); +} + +void +kibnal_peer_notify (kib_peer_t *peer) +{ + time_t last_alive = 0; + int error = 0; + unsigned long flags; + + read_lock_irqsave(&kibnal_data.kib_global_lock, flags); + + if (list_empty(&peer->ibp_conns) && + peer->ibp_accepting == 0 && + peer->ibp_connecting == 0 && + peer->ibp_error != 0) { + error = peer->ibp_error; + peer->ibp_error = 0; + last_alive = cfs_time_current_sec() - + cfs_duration_sec(cfs_time_current() - + peer->ibp_last_alive); + } + + read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags); + + if (error != 0) + lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive); +} + +void kibnal_close_conn_locked (kib_conn_t *conn, int error) { /* This just does the immmediate housekeeping, and schedules the @@ -1458,8 +1518,9 @@ kibnal_close_conn_locked (kib_conn_t *conn, int error) * Caller holds kib_global_lock exclusively in irq context */ kib_peer_t *peer = conn->ibc_peer; - CDEBUG (error == 0 ? D_NET : D_ERROR, - "closing conn to "LPX64": error %d\n", peer->ibp_nid, error); + CDEBUG (error == 0 ? D_NET : D_NETERROR, + "closing conn to %s: error %d\n", + libcfs_nid2str(peer->ibp_nid), error); LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED || conn->ibc_state == IBNAL_CONN_CONNECTING); @@ -1469,16 +1530,15 @@ kibnal_close_conn_locked (kib_conn_t *conn, int error) list_del (&conn->ibc_list); } else { /* new ref for kib_reaper_conns */ - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_inc (&conn->ibc_refcount); + kibnal_conn_addref(conn); } - if (list_empty (&peer->ibp_conns) && - peer->ibp_persistence == 0) { - /* Non-persistent peer with no more conns... */ - kibnal_unlink_peer_locked (peer); + if (list_empty (&peer->ibp_conns)) { /* no more conns */ + if (peer->ibp_persistence == 0 && /* non-persistent peer */ + kibnal_peer_active(peer)) /* still in peer table */ + kibnal_unlink_peer_locked (peer); + + peer->ibp_error = error; /* set/clear error on last conn */ } conn->ibc_state = IBNAL_CONN_DEATHROW; @@ -1512,48 +1572,55 @@ kibnal_close_conn (kib_conn_t *conn, int why) } void -kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc) +kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error) { LIST_HEAD (zombies); - kib_tx_t *tx; unsigned long flags; - LASSERT (rc != 0); - LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL); + LASSERT(error != 0); write_lock_irqsave (&kibnal_data.kib_global_lock, flags); - LASSERT (peer->ibp_connecting != 0); - peer->ibp_connecting--; + if (active) { + LASSERT (peer->ibp_connecting != 0); + peer->ibp_connecting--; + } else { + LASSERT (peer->ibp_accepting != 0); + peer->ibp_accepting--; + } - if (peer->ibp_connecting != 0) { - /* another connection attempt under way (loopback?)... */ + if (peer->ibp_connecting != 0 || + peer->ibp_accepting != 0) { + /* another connection attempt under way... */ write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); return; } if (list_empty(&peer->ibp_conns)) { /* Say when active connection can be re-attempted */ - peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval; - /* Increase reconnection interval */ - peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2, - IBNAL_MAX_RECONNECT_INTERVAL); + peer->ibp_reconnect_interval *= 2; + peer->ibp_reconnect_interval = + MAX(peer->ibp_reconnect_interval, + *kibnal_tunables.kib_min_reconnect_interval); + peer->ibp_reconnect_interval = + MIN(peer->ibp_reconnect_interval, + *kibnal_tunables.kib_max_reconnect_interval); + + peer->ibp_reconnect_time = jiffies + + peer->ibp_reconnect_interval * HZ; - /* Take peer's blocked blocked transmits; I'll complete + /* Take peer's blocked transmits; I'll complete * them with error */ - while (!list_empty (&peer->ibp_tx_queue)) { - tx = list_entry (peer->ibp_tx_queue.next, - kib_tx_t, tx_list); - - list_del (&tx->tx_list); - list_add_tail (&tx->tx_list, &zombies); - } + list_add(&zombies, &peer->ibp_tx_queue); + list_del_init(&peer->ibp_tx_queue); if (kibnal_peer_active(peer) && (peer->ibp_persistence == 0)) { /* failed connection attempt on non-persistent peer */ kibnal_unlink_peer_locked (peer); } + + peer->ibp_error = error; } else { /* Can't have blocked transmits if there are connections */ LASSERT (list_empty(&peer->ibp_tx_queue)); @@ -1561,18 +1628,13 @@ kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc) write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); + kibnal_peer_notify(peer); + if (!list_empty (&zombies)) - CERROR ("Deleting messages for "LPX64": connection failed\n", - peer->ibp_nid); + CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n", + libcfs_nid2str(peer->ibp_nid)); - while (!list_empty (&zombies)) { - tx = list_entry (zombies.next, kib_tx_t, tx_list); - - list_del (&tx->tx_list); - /* complete now */ - tx->tx_status = -EHOSTUNREACH; - kibnal_tx_done (tx); - } + kibnal_txlist_done(&zombies, -EHOSTUNREACH); } void @@ -1585,55 +1647,63 @@ kibnal_connreq_done (kib_conn_t *conn, int active, int status) int rc; int i; - /* passive connection has no connreq & vice versa */ - LASSERT (!active == !(conn->ibc_connreq != NULL)); - if (active) { - PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq)); + if (conn->ibc_connreq != NULL) { + LIBCFS_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq)); conn->ibc_connreq = NULL; } - if (state == IBNAL_CONN_CONNECTING) { - /* Install common (active/passive) callback for - * disconnect/idle notification if I got as far as getting - * a CM comm_id */ - rc = tsIbCmCallbackModify(conn->ibc_comm_id, - kibnal_conn_callback, conn); - LASSERT (rc == 0); + switch (state) { + case IBNAL_CONN_CONNECTING: + /* conn has a CM comm_id */ + if (status == 0) { + /* Install common (active/passive) callback for + * disconnect/idle notification */ + rc = tsIbCmCallbackModify(conn->ibc_comm_id, + kibnal_conn_callback, + conn); + LASSERT (rc == 0); + } else { + /* LASSERT (no more CM callbacks) */ + rc = tsIbCmCallbackModify(conn->ibc_comm_id, + kibnal_bad_conn_callback, + conn); + LASSERT (rc == 0); + } + break; + + case IBNAL_CONN_INIT_QP: + LASSERT (status != 0); + break; + + default: + LBUG(); } write_lock_irqsave (&kibnal_data.kib_global_lock, flags); - LASSERT (peer->ibp_connecting != 0); + if (active) + LASSERT (peer->ibp_connecting != 0); + else + LASSERT (peer->ibp_accepting != 0); - if (status == 0) { - /* connection established... */ - LASSERT (state == IBNAL_CONN_CONNECTING); - conn->ibc_state = IBNAL_CONN_ESTABLISHED; - - if (!kibnal_peer_active(peer)) { - /* ...but peer deleted meantime */ - status = -ECONNABORTED; - } - } else { - LASSERT (state == IBNAL_CONN_INIT_QP || - state == IBNAL_CONN_CONNECTING); - } + if (status == 0 && /* connection established */ + kibnal_peer_active(peer)) { /* peer not deleted */ - if (status == 0) { - /* Everything worked! */ + if (active) + peer->ibp_connecting--; + else + peer->ibp_accepting--; - peer->ibp_connecting--; + conn->ibc_last_send = jiffies; + conn->ibc_state = IBNAL_CONN_ESTABLISHED; + kibnal_peer_alive(peer); /* +1 ref for ibc_list; caller(== CM)'s ref remains until * the IB_CM_IDLE callback */ - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_inc (&conn->ibc_refcount); + kibnal_conn_addref(conn); list_add (&conn->ibc_list, &peer->ibp_conns); - - /* reset reconnect interval for next attempt */ - peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL; + + peer->ibp_reconnect_interval = 0; /* OK to reconnect at any time */ /* post blocked sends to the new connection */ spin_lock (&conn->ibc_lock); @@ -1644,11 +1714,6 @@ kibnal_connreq_done (kib_conn_t *conn, int active, int status) list_del (&tx->tx_list); - /* +1 ref for each tx */ - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_inc (&conn->ibc_refcount); kibnal_queue_tx_locked (tx, conn); } @@ -1663,45 +1728,37 @@ kibnal_connreq_done (kib_conn_t *conn, int active, int status) /* queue up all the receives */ for (i = 0; i < IBNAL_RX_MSGS; i++) { /* +1 ref for rx desc */ - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_inc (&conn->ibc_refcount); + kibnal_conn_addref(conn); CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n", i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg, conn->ibc_rxs[i].rx_vaddr); - kibnal_post_rx (&conn->ibc_rxs[i], 0); + kibnal_post_rx (&conn->ibc_rxs[i], 0, 0); } kibnal_check_sends (conn); return; } - /* connection failed */ - if (state == IBNAL_CONN_CONNECTING) { - /* schedule for reaper to close */ + if (status == 0) { + /* connection established, but peer was deleted. Schedule for + * reaper to cm_disconnect... */ + status = -ECONNABORTED; kibnal_close_conn_locked (conn, status); } else { - /* Don't have a CM comm_id; just wait for refs to drain */ + /* just waiting for refs to drain */ conn->ibc_state = IBNAL_CONN_ZOMBIE; } write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); kibnal_peer_connect_failed (conn->ibc_peer, active, status); - - if (state != IBNAL_CONN_CONNECTING) { - /* drop caller's ref if we're not waiting for the - * IB_CM_IDLE callback */ - kibnal_put_conn (conn); - } } int -kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, - kib_msg_t *msg, int nob) +kibnal_accept_connreq (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, + kib_msg_t *msg, int nob) { kib_conn_t *conn; kib_peer_t *peer; @@ -1709,23 +1766,24 @@ kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, unsigned long flags; int rc; - rc = kibnal_unpack_msg(msg, nob); + rc = kibnal_unpack_msg(msg, 0, nob); if (rc != 0) { CERROR("Can't unpack connreq msg: %d\n", rc); return -EPROTO; } - CDEBUG(D_NET, "connreq from "LPX64"\n", msg->ibm_srcnid); + CDEBUG(D_NET, "connreq from %s\n", libcfs_nid2str(msg->ibm_srcnid)); if (msg->ibm_type != IBNAL_MSG_CONNREQ) { - CERROR("Unexpected connreq msg type: %x from "LPX64"\n", - msg->ibm_type, msg->ibm_srcnid); + CERROR("Unexpected connreq msg type: %x from %s\n", + msg->ibm_type, libcfs_nid2str(msg->ibm_srcnid)); return -EPROTO; } if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) { - CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n", - msg->ibm_srcnid, msg->ibm_u.connparams.ibcp_queue_depth, + CERROR("Can't accept %s: bad queue depth %d (%d expected)\n", + libcfs_nid2str(msg->ibm_srcnid), + msg->ibm_u.connparams.ibcp_queue_depth, IBNAL_MSG_QUEUE_SIZE); return (-EPROTO); } @@ -1735,47 +1793,68 @@ kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, return (-ENOMEM); /* assume 'nid' is a new peer */ - peer = kibnal_create_peer (msg->ibm_srcnid); - if (peer == NULL) { - CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, conn->ibc_peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); - atomic_dec (&conn->ibc_refcount); - kibnal_destroy_conn(conn); + rc = kibnal_create_peer(&peer, msg->ibm_srcnid); + if (rc != 0) { + kibnal_conn_decref(conn); return (-ENOMEM); } write_lock_irqsave (&kibnal_data.kib_global_lock, flags); + if (kibnal_data.kib_nonewpeers) { + write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); + + CERROR ("Shutdown has started, drop connreq from %s\n", + libcfs_nid2str(msg->ibm_srcnid)); + kibnal_conn_decref(conn); + kibnal_peer_decref(peer); + return -ESHUTDOWN; + } + /* Check I'm the same instance that gave the connection parameters. * NB If my incarnation changes after this, the peer will get nuked and * we'll spot that when the connection is finally added into the peer's * connlist */ - if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid || + if (kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid || msg->ibm_dststamp != kibnal_data.kib_incarnation) { write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); - CERROR("Stale connection params from "LPX64"\n", - msg->ibm_srcnid); - atomic_dec(&conn->ibc_refcount); - kibnal_destroy_conn(conn); - kibnal_put_peer(peer); + CERROR("Stale connection params from %s\n", + libcfs_nid2str(msg->ibm_srcnid)); + kibnal_conn_decref(conn); + kibnal_peer_decref(peer); return -ESTALE; } peer2 = kibnal_find_peer_locked(msg->ibm_srcnid); if (peer2 == NULL) { + /* Brand new peer */ + LASSERT (peer->ibp_accepting == 0); + /* peer table takes my ref on peer */ list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(msg->ibm_srcnid)); } else { - kibnal_put_peer (peer); + /* tie-break connection race in favour of the higher NID */ + if (peer2->ibp_connecting != 0 && + msg->ibm_srcnid < kibnal_data.kib_ni->ni_nid) { + write_unlock_irqrestore(&kibnal_data.kib_global_lock, + flags); + CWARN("Conn race %s\n", + libcfs_nid2str(peer2->ibp_nid)); + + kibnal_conn_decref(conn); + kibnal_peer_decref(peer); + return -EALREADY; + } + + kibnal_peer_decref(peer); peer = peer2; } /* +1 ref for conn */ - atomic_inc (&peer->ibp_refcount); - peer->ibp_connecting++; + kibnal_peer_addref(peer); + peer->ibp_accepting++; write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); @@ -1784,110 +1863,105 @@ kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, conn->ibc_comm_id = cid; conn->ibc_incarnation = msg->ibm_srcstamp; conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE; + conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE; + conn->ibc_version = msg->ibm_version; *connp = conn; return (0); } tTS_IB_CM_CALLBACK_RETURN -kibnal_idle_conn_callback (tTS_IB_CM_EVENT event, - tTS_IB_CM_COMM_ID cid, - void *param, - void *arg) +kibnal_bad_conn_callback (tTS_IB_CM_EVENT event, + tTS_IB_CM_COMM_ID cid, + void *param, + void *arg) { - /* Shouldn't ever get a callback after TS_IB_CM_IDLE */ CERROR ("Unexpected event %d: conn %p\n", event, arg); LBUG (); return TS_IB_CM_CALLBACK_PROCEED; } -tTS_IB_CM_CALLBACK_RETURN -kibnal_conn_callback (tTS_IB_CM_EVENT event, - tTS_IB_CM_COMM_ID cid, - void *param, - void *arg) +void +kibnal_abort_txs (kib_conn_t *conn, struct list_head *txs) { - kib_conn_t *conn = arg; LIST_HEAD (zombies); struct list_head *tmp; struct list_head *nxt; kib_tx_t *tx; unsigned long flags; - int done; + + spin_lock_irqsave (&conn->ibc_lock, flags); + + list_for_each_safe (tmp, nxt, txs) { + tx = list_entry (tmp, kib_tx_t, tx_list); + + if (txs == &conn->ibc_active_txs) { + LASSERT (tx->tx_passive_rdma || + !tx->tx_passive_rdma_wait); + + LASSERT (tx->tx_passive_rdma_wait || + tx->tx_sending != 0); + } else { + LASSERT (!tx->tx_passive_rdma_wait); + LASSERT (tx->tx_sending == 0); + } + + tx->tx_status = -ECONNABORTED; + tx->tx_passive_rdma_wait = 0; + + if (tx->tx_sending == 0) { + list_del (&tx->tx_list); + list_add (&tx->tx_list, &zombies); + } + } + + spin_unlock_irqrestore (&conn->ibc_lock, flags); + + kibnal_txlist_done (&zombies, -ECONNABORTED); +} + +tTS_IB_CM_CALLBACK_RETURN +kibnal_conn_callback (tTS_IB_CM_EVENT event, + tTS_IB_CM_COMM_ID cid, + void *param, + void *arg) +{ + kib_conn_t *conn = arg; int rc; /* Established Connection Notifier */ switch (event) { default: - CERROR("Connection %p -> "LPX64" ERROR %d\n", - conn, conn->ibc_peer->ibp_nid, event); + CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event); kibnal_close_conn (conn, -ECONNABORTED); break; case TS_IB_CM_DISCONNECTED: - CDEBUG(D_WARNING, "Connection %p -> "LPX64" DISCONNECTED.\n", - conn, conn->ibc_peer->ibp_nid); + CDEBUG(D_NETERROR, "Connection %p -> %s DISCONNECTED.\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid)); kibnal_close_conn (conn, 0); break; case TS_IB_CM_IDLE: - CDEBUG(D_NET, "Connection %p -> "LPX64" IDLE.\n", - conn, conn->ibc_peer->ibp_nid); - kibnal_put_conn (conn); /* Lose CM's ref */ + CDEBUG(D_NET, "Connection %p -> %s IDLE.\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid)); /* LASSERT (no further callbacks) */ - rc = tsIbCmCallbackModify(cid, - kibnal_idle_conn_callback, conn); + rc = tsIbCmCallbackModify(cid, kibnal_bad_conn_callback, conn); LASSERT (rc == 0); /* NB we wait until the connection has closed before * completing outstanding passive RDMAs so we can be sure * the network can't touch the mapped memory any more. */ - spin_lock_irqsave (&conn->ibc_lock, flags); - - /* grab passive RDMAs not waiting for the tx callback */ - list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) { - tx = list_entry (tmp, kib_tx_t, tx_list); - - LASSERT (tx->tx_passive_rdma || - !tx->tx_passive_rdma_wait); - - LASSERT (tx->tx_passive_rdma_wait || - tx->tx_sending != 0); - - /* still waiting for tx callback? */ - if (!tx->tx_passive_rdma_wait) - continue; - - tx->tx_status = -ECONNABORTED; - tx->tx_passive_rdma_wait = 0; - done = (tx->tx_sending == 0); - - if (!done) - continue; - - list_del (&tx->tx_list); - list_add (&tx->tx_list, &zombies); - } - - /* grab all blocked transmits */ - list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) { - tx = list_entry (tmp, kib_tx_t, tx_list); - - list_del (&tx->tx_list); - list_add (&tx->tx_list, &zombies); - } + kibnal_abort_txs(conn, &conn->ibc_tx_queue); + kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd); + kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred); + kibnal_abort_txs(conn, &conn->ibc_active_txs); - spin_unlock_irqrestore (&conn->ibc_lock, flags); - - while (!list_empty(&zombies)) { - tx = list_entry (zombies.next, kib_tx_t, tx_list); - - list_del(&tx->tx_list); - kibnal_tx_done (tx); - } + kibnal_conn_decref(conn); /* Lose CM's ref */ break; } @@ -1896,9 +1970,9 @@ kibnal_conn_callback (tTS_IB_CM_EVENT event, tTS_IB_CM_CALLBACK_RETURN kibnal_passive_conn_callback (tTS_IB_CM_EVENT event, - tTS_IB_CM_COMM_ID cid, - void *param, - void *arg) + tTS_IB_CM_COMM_ID cid, + void *param, + void *arg) { kib_conn_t *conn = arg; int rc; @@ -1911,10 +1985,12 @@ kibnal_passive_conn_callback (tTS_IB_CM_EVENT event, return TS_IB_CM_CALLBACK_ABORT; } - CERROR ("Unexpected event %p -> "LPX64": %d\n", - conn, conn->ibc_peer->ibp_nid, event); - kibnal_connreq_done (conn, 0, -ECONNABORTED); - break; + CERROR ("%s event %p -> %s: %d\n", + (event == TS_IB_CM_IDLE) ? "IDLE" : "Unexpected", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event); + kibnal_connreq_done(conn, 0, -ECONNABORTED); + kibnal_conn_decref(conn); /* drop CM's ref */ + return TS_IB_CM_CALLBACK_ABORT; case TS_IB_CM_REQ_RECEIVED: { struct ib_cm_req_received_param *req = param; @@ -1923,13 +1999,13 @@ kibnal_passive_conn_callback (tTS_IB_CM_EVENT event, LASSERT (conn == NULL); /* Don't really know srcnid until successful unpack */ - CDEBUG(D_NET, "REQ from ?"LPX64"?\n", msg->ibm_srcnid); + CDEBUG(D_NET, "REQ from ?%s?\n", libcfs_nid2str(msg->ibm_srcnid)); - rc = kibnal_accept(&conn, cid, msg, - req->remote_private_data_len); + rc = kibnal_accept_connreq(&conn, cid, msg, + req->remote_private_data_len); if (rc != 0) { - CERROR ("Can't accept ?"LPX64"?: %d\n", - msg->ibm_srcnid, rc); + CERROR ("Can't accept ?%s?: %d\n", + libcfs_nid2str(msg->ibm_srcnid), rc); return TS_IB_CM_CALLBACK_ABORT; } @@ -1943,7 +2019,7 @@ kibnal_passive_conn_callback (tTS_IB_CM_EVENT event, msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE; - kibnal_pack_msg(msg, 0, + kibnal_pack_msg(msg, conn->ibc_version, 0, conn->ibc_peer->ibp_nid, conn->ibc_incarnation); @@ -1955,29 +2031,27 @@ kibnal_passive_conn_callback (tTS_IB_CM_EVENT event, req->accept_param.flow_control = IBNAL_FLOW_CONTROL; CDEBUG(D_NET, "Proceeding\n"); - break; + return TS_IB_CM_CALLBACK_PROCEED; /* CM takes my ref on conn */ } case TS_IB_CM_ESTABLISHED: LASSERT (conn != NULL); - CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n", - conn, conn->ibc_peer->ibp_nid); + CWARN("Connection %p -> %s ESTABLISHED.\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid)); - kibnal_connreq_done (conn, 0, 0); - break; + kibnal_connreq_done(conn, 0, 0); + return TS_IB_CM_CALLBACK_PROCEED; } - - /* NB if the connreq is done, we switch to kibnal_conn_callback */ - return TS_IB_CM_CALLBACK_PROCEED; } tTS_IB_CM_CALLBACK_RETURN kibnal_active_conn_callback (tTS_IB_CM_EVENT event, - tTS_IB_CM_COMM_ID cid, - void *param, - void *arg) + tTS_IB_CM_COMM_ID cid, + void *param, + void *arg) { - kib_conn_t *conn = arg; + kib_conn_t *conn = arg; + unsigned long flags; switch (event) { case TS_IB_CM_REP_RECEIVED: { @@ -1986,71 +2060,79 @@ kibnal_active_conn_callback (tTS_IB_CM_EVENT event, int nob = rep->remote_private_data_len; int rc; - rc = kibnal_unpack_msg(msg, nob); + rc = kibnal_unpack_msg(msg, conn->ibc_version, nob); if (rc != 0) { - CERROR ("Error %d unpacking conn ack from "LPX64"\n", - rc, conn->ibc_peer->ibp_nid); - kibnal_connreq_done (conn, 1, rc); - break; + CERROR ("Error %d unpacking conn ack from %s\n", + rc, libcfs_nid2str(conn->ibc_peer->ibp_nid)); + kibnal_connreq_done(conn, 1, rc); + kibnal_conn_decref(conn); /* drop CM's ref */ + return TS_IB_CM_CALLBACK_ABORT; } if (msg->ibm_type != IBNAL_MSG_CONNACK) { - CERROR ("Unexpected conn ack type %d from "LPX64"\n", - msg->ibm_type, conn->ibc_peer->ibp_nid); - kibnal_connreq_done (conn, 1, -EPROTO); - break; + CERROR ("Unexpected conn ack type %d from %s\n", + msg->ibm_type, + libcfs_nid2str(conn->ibc_peer->ibp_nid)); + kibnal_connreq_done(conn, 1, -EPROTO); + kibnal_conn_decref(conn); /* drop CM's ref */ + return TS_IB_CM_CALLBACK_ABORT; } - if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid || + if (conn->ibc_peer->ibp_nid != msg->ibm_srcnid || + kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid || msg->ibm_srcstamp != conn->ibc_incarnation || - msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid || msg->ibm_dststamp != kibnal_data.kib_incarnation) { - CERROR("Stale conn ack from "LPX64"\n", - conn->ibc_peer->ibp_nid); - kibnal_connreq_done (conn, 1, -ESTALE); - break; + CERROR("Stale conn ack from %s\n", + libcfs_nid2str(conn->ibc_peer->ibp_nid)); + kibnal_connreq_done(conn, 1, -ESTALE); + kibnal_conn_decref(conn); /* drop CM's ref */ + return TS_IB_CM_CALLBACK_ABORT; } if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) { - CERROR ("Bad queue depth %d from "LPX64"\n", + CERROR ("Bad queue depth %d from %s\n", msg->ibm_u.connparams.ibcp_queue_depth, - conn->ibc_peer->ibp_nid); - kibnal_connreq_done (conn, 1, -EPROTO); - break; + libcfs_nid2str(conn->ibc_peer->ibp_nid)); + kibnal_connreq_done(conn, 1, -EPROTO); + kibnal_conn_decref(conn); /* drop CM's ref */ + return TS_IB_CM_CALLBACK_ABORT; } - CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n", - conn, conn->ibc_peer->ibp_nid); + CDEBUG(D_NET, "Connection %p -> %s REP_RECEIVED.\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid)); conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE; - break; + conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE; + return TS_IB_CM_CALLBACK_PROCEED; } case TS_IB_CM_ESTABLISHED: - CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED\n", - conn, conn->ibc_peer->ibp_nid); + CWARN("Connection %p -> %s ESTABLISHED\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid)); - kibnal_connreq_done (conn, 1, 0); - break; + kibnal_connreq_done(conn, 1, 0); + return TS_IB_CM_CALLBACK_PROCEED; case TS_IB_CM_IDLE: - CERROR("Connection %p -> "LPX64" IDLE\n", - conn, conn->ibc_peer->ibp_nid); - /* Back out state change: I'm disengaged from CM */ - conn->ibc_state = IBNAL_CONN_INIT_QP; - - kibnal_connreq_done (conn, 1, -ECONNABORTED); - break; + CDEBUG(D_NETERROR, "Connection %p -> %s IDLE\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid)); + /* I assume this connection attempt was rejected because the + * peer found a stale QP; I'll just try again */ + write_lock_irqsave(&kibnal_data.kib_global_lock, flags); + kibnal_schedule_active_connect_locked(conn->ibc_peer); + write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags); + + kibnal_connreq_done(conn, 1, -ECONNABORTED); + kibnal_conn_decref(conn); /* drop CM's ref */ + return TS_IB_CM_CALLBACK_ABORT; default: - CERROR("Connection %p -> "LPX64" ERROR %d\n", - conn, conn->ibc_peer->ibp_nid, event); - kibnal_connreq_done (conn, 1, -ECONNABORTED); - break; + CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event); + kibnal_connreq_done(conn, 1, -ECONNABORTED); + kibnal_conn_decref(conn); /* drop CM's ref */ + return TS_IB_CM_CALLBACK_ABORT; } - - /* NB if the connreq is done, we switch to kibnal_conn_callback */ - return TS_IB_CM_CALLBACK_PROCEED; } int @@ -2063,16 +2145,19 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, kib_msg_t *msg = &conn->ibc_connreq->cr_msg; if (status != 0) { - CERROR ("status %d\n", status); - kibnal_connreq_done (conn, 1, status); - goto out; + CDEBUG (D_NETERROR, "Pathreq %p -> %s failed: %d\n", + conn, libcfs_nid2str(peer->ibp_nid), status); + kibnal_connreq_done(conn, 1, status); + kibnal_conn_decref(conn); /* drop callback's ref */ + return 1; /* non-zero prevents further callbacks */ } conn->ibc_connreq->cr_path = *resp; kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams)); msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE; - kibnal_pack_msg(msg, 0, peer->ibp_nid, conn->ibc_incarnation); + kibnal_pack_msg(msg, conn->ibc_version, 0, + peer->ibp_nid, conn->ibc_incarnation); conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) { .qp = conn->ibc_qp, @@ -2082,7 +2167,7 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, .initiator_depth = IBNAL_RESPONDER_RESOURCES, .retry_count = IBNAL_RETRY, .rnr_retry_count = IBNAL_RNR_RETRY, - .cm_response_timeout = kibnal_tunables.kib_io_timeout, + .cm_response_timeout = *kibnal_tunables.kib_timeout, .max_cm_retries = IBNAL_CM_RETRY, .flow_control = IBNAL_FLOW_CONTROL, }; @@ -2093,8 +2178,9 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, /* Flag I'm getting involved with the CM... */ conn->ibc_state = IBNAL_CONN_CONNECTING; - CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n", - conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, peer->ibp_nid); + CDEBUG(D_NET, "Connecting to, service id "LPX64", on %s\n", + conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, + libcfs_nid2str(peer->ibp_nid)); /* kibnal_connect_callback gets my conn ref */ status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, @@ -2103,15 +2189,15 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, kibnal_active_conn_callback, conn, &conn->ibc_comm_id); if (status != 0) { - CERROR ("Connect: %d\n", status); + CERROR ("Connect %p -> %s failed: %d\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), status); /* Back out state change: I've not got a CM comm_id yet... */ conn->ibc_state = IBNAL_CONN_INIT_QP; - kibnal_connreq_done (conn, 1, status); + kibnal_connreq_done(conn, 1, status); + kibnal_conn_decref(conn); /* Drop callback's ref */ } - out: - /* return non-zero to prevent further callbacks */ - return 1; + return 1; /* non-zero to prevent further callbacks */ } void @@ -2128,12 +2214,13 @@ kibnal_connect_peer (kib_peer_t *peer) } conn->ibc_peer = peer; - atomic_inc (&peer->ibp_refcount); + kibnal_peer_addref(peer); - PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq)); + LIBCFS_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq)); if (conn->ibc_connreq == NULL) { CERROR ("Can't allocate connreq\n"); - kibnal_connreq_done (conn, 1, -ENOMEM); + kibnal_connreq_done(conn, 1, -ENOMEM); + kibnal_conn_decref(conn); /* drop my ref */ return; } @@ -2142,6 +2229,7 @@ kibnal_connect_peer (kib_peer_t *peer) rc = kibnal_make_svcqry(conn); if (rc != 0) { kibnal_connreq_done (conn, 1, rc); + kibnal_conn_decref(conn); /* drop my ref */ return; } @@ -2157,56 +2245,60 @@ kibnal_connect_peer (kib_peer_t *peer) conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid, conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey, 0, - kibnal_tunables.kib_io_timeout * HZ, + *kibnal_tunables.kib_timeout * HZ, 0, kibnal_pathreq_callback, conn, &conn->ibc_connreq->cr_tid); if (rc == 0) - return; + return; /* callback now has my ref on conn */ - CERROR ("Path record request: %d\n", rc); - kibnal_connreq_done (conn, 1, rc); + CERROR ("Path record request %p -> %s failed: %d\n", + conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), rc); + kibnal_connreq_done(conn, 1, rc); + kibnal_conn_decref(conn); /* drop my ref */ } int -kibnal_conn_timed_out (kib_conn_t *conn) +kibnal_check_txs (kib_conn_t *conn, struct list_head *txs) { kib_tx_t *tx; struct list_head *ttmp; unsigned long flags; + int timed_out = 0; spin_lock_irqsave (&conn->ibc_lock, flags); - list_for_each (ttmp, &conn->ibc_tx_queue) { + list_for_each (ttmp, txs) { tx = list_entry (ttmp, kib_tx_t, tx_list); - LASSERT (!tx->tx_passive_rdma_wait); - LASSERT (tx->tx_sending == 0); + if (txs == &conn->ibc_active_txs) { + LASSERT (tx->tx_passive_rdma || + !tx->tx_passive_rdma_wait); - if (time_after_eq (jiffies, tx->tx_deadline)) { - spin_unlock_irqrestore (&conn->ibc_lock, flags); - return 1; + LASSERT (tx->tx_passive_rdma_wait || + tx->tx_sending != 0); + } else { + LASSERT (!tx->tx_passive_rdma_wait); + LASSERT (tx->tx_sending == 0); } - } - - list_for_each (ttmp, &conn->ibc_active_txs) { - tx = list_entry (ttmp, kib_tx_t, tx_list); - - LASSERT (tx->tx_passive_rdma || - !tx->tx_passive_rdma_wait); - - LASSERT (tx->tx_passive_rdma_wait || - tx->tx_sending != 0); - + if (time_after_eq (jiffies, tx->tx_deadline)) { - spin_unlock_irqrestore (&conn->ibc_lock, flags); - return 1; + timed_out = 1; + break; } } spin_unlock_irqrestore (&conn->ibc_lock, flags); + return timed_out; +} - return 0; +int +kibnal_conn_timed_out (kib_conn_t *conn) +{ + return kibnal_check_txs(conn, &conn->ibc_tx_queue) || + kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) || + kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) || + kibnal_check_txs(conn, &conn->ibc_active_txs); } void @@ -2217,12 +2309,13 @@ kibnal_check_conns (int idx) kib_peer_t *peer; kib_conn_t *conn; struct list_head *ctmp; + unsigned long flags; again: /* NB. We expect to have a look at all the peers and not find any * rdmas to time out, so we just use a shared lock while we * take a look... */ - read_lock (&kibnal_data.kib_global_lock); + read_lock_irqsave(&kibnal_data.kib_global_lock, flags); list_for_each (ptmp, peers) { peer = list_entry (ptmp, kib_peer_t, ibp_list); @@ -2241,25 +2334,23 @@ kibnal_check_conns (int idx) if (!kibnal_conn_timed_out(conn)) continue; - CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", - conn, conn->ibc_state, peer->ibp_nid, - atomic_read (&conn->ibc_refcount)); + kibnal_conn_addref(conn); - atomic_inc (&conn->ibc_refcount); - read_unlock (&kibnal_data.kib_global_lock); + read_unlock_irqrestore(&kibnal_data.kib_global_lock, + flags); - CERROR("Timed out RDMA with "LPX64"\n", - peer->ibp_nid); + CERROR("Timed out RDMA with %s\n", + libcfs_nid2str(peer->ibp_nid)); kibnal_close_conn (conn, -ETIMEDOUT); - kibnal_put_conn (conn); + kibnal_conn_decref(conn); /* start again now I've dropped the lock */ goto again; } } - read_unlock (&kibnal_data.kib_global_lock); + read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags); } void @@ -2273,8 +2364,10 @@ kibnal_terminate_conn (kib_conn_t *conn) rc = ib_cm_disconnect (conn->ibc_comm_id); if (rc != 0) - CERROR ("Error %d disconnecting conn %p -> "LPX64"\n", - rc, conn, conn->ibc_peer->ibp_nid); + CERROR ("Error %d disconnecting conn %p -> %s\n", + rc, conn, libcfs_nid2str(conn->ibc_peer->ibp_nid)); + + kibnal_peer_notify(conn->ibc_peer); } int @@ -2288,8 +2381,8 @@ kibnal_reaper (void *arg) int peer_index = 0; unsigned long deadline = jiffies; - kportal_daemonize ("kibnal_reaper"); - kportal_blockallsigs (); + cfs_daemonize ("kibnal_reaper"); + cfs_block_allsigs (); init_waitqueue_entry (&wait, current); @@ -2310,9 +2403,10 @@ kibnal_reaper (void *arg) * callback and last ref reschedules it * here... */ kibnal_terminate_conn(conn); - kibnal_put_conn (conn); + kibnal_conn_decref(conn); break; - + + case IBNAL_CONN_INIT_QP: case IBNAL_CONN_ZOMBIE: kibnal_destroy_conn (conn); break; @@ -2343,9 +2437,9 @@ kibnal_reaper (void *arg) * connection within (n+1)/n times the timeout * interval. */ - if (kibnal_tunables.kib_io_timeout > n * p) + if (*kibnal_tunables.kib_timeout > n * p) chunk = (chunk * n * p) / - kibnal_tunables.kib_io_timeout; + *kibnal_tunables.kib_timeout; if (chunk == 0) chunk = 1; @@ -2389,8 +2483,8 @@ kibnal_connd (void *arg) int did_something; snprintf(name, sizeof(name), "kibnal_connd_%02ld", id); - kportal_daemonize(name); - kportal_blockallsigs(); + cfs_daemonize(name); + cfs_block_allsigs(); init_waitqueue_entry (&wait, current); @@ -2407,32 +2501,37 @@ kibnal_connd (void *arg) spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); kibnal_handle_svcqry(as->ibas_sock); - sock_release(as->ibas_sock); - PORTAL_FREE(as, sizeof(*as)); + kibnal_free_acceptsock(as); spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags); did_something = 1; } - if (!list_empty (&kibnal_data.kib_connd_peers)) { + /* Only handle an outgoing connection request if there is someone left + * to handle an incoming svcqry */ + if (!list_empty (&kibnal_data.kib_connd_peers) && + ((kibnal_data.kib_connd_connecting + 1) < + *kibnal_tunables.kib_n_connd)) { peer = list_entry (kibnal_data.kib_connd_peers.next, kib_peer_t, ibp_connd_list); list_del_init (&peer->ibp_connd_list); + kibnal_data.kib_connd_connecting++; spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); kibnal_connect_peer (peer); - kibnal_put_peer (peer); + kibnal_peer_decref(peer); spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags); did_something = 1; + kibnal_data.kib_connd_connecting--; } if (did_something) continue; set_current_state (TASK_INTERRUPTIBLE); - add_wait_queue (&kibnal_data.kib_connd_waitq, &wait); + add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait); spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); @@ -2463,8 +2562,8 @@ kibnal_scheduler(void *arg) int did_something; snprintf(name, sizeof(name), "kibnal_sd_%02ld", id); - kportal_daemonize(name); - kportal_blockallsigs(); + cfs_daemonize(name); + cfs_block_allsigs(); spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags); @@ -2504,13 +2603,13 @@ kibnal_scheduler(void *arg) counter = 0; if (!did_something) { - rc = wait_event_interruptible( + rc = wait_event_interruptible_exclusive( kibnal_data.kib_sched_waitq, !list_empty(&kibnal_data.kib_sched_txq) || !list_empty(&kibnal_data.kib_sched_rxq) || kibnal_data.kib_shutdown); } else { - our_cond_resched(); + cfs_cond_resched(); } spin_lock_irqsave(&kibnal_data.kib_sched_lock, @@ -2523,13 +2622,3 @@ kibnal_scheduler(void *arg) kibnal_thread_fini(); return (0); } - - -lib_nal_t kibnal_lib = { - libnal_data: &kibnal_data, /* NAL private data */ - libnal_send: kibnal_send, - libnal_send_pages: kibnal_send_pages, - libnal_recv: kibnal_recv, - libnal_recv_pages: kibnal_recv_pages, - libnal_dist: kibnal_dist -};