*
*/
-#include "openibnal.h"
+#include "openiblnd.h"
/*
* LIB functions follow
void
kibnal_tx_done (kib_tx_t *tx)
{
- ptl_err_t ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
+ lnet_msg_t *lntmsg[2];
unsigned long flags;
int i;
int rc;
LASSERT (tx->tx_sending == 0); /* mustn't be awaiting callback */
LASSERT (!tx->tx_passive_rdma_wait); /* mustn't be awaiting RDMA */
+ if (in_interrupt()) {
+ /* can't deregister memory/flush FMAs/finalize in IRQ context... */
+ kibnal_schedule_tx_done(tx);
+ return;
+ }
+
switch (tx->tx_mapped) {
default:
LBUG();
break;
case KIB_TX_MAPPED:
- if (in_interrupt()) {
- /* can't deregister memory in IRQ context... */
- kibnal_schedule_tx_done(tx);
- return;
- }
rc = ib_memory_deregister(tx->tx_md.md_handle.mr);
LASSERT (rc == 0);
tx->tx_mapped = KIB_TX_UNMAPPED;
#if IBNAL_FMR
case KIB_TX_MAPPED_FMR:
- if (in_interrupt() && tx->tx_status != 0) {
- /* can't flush FMRs in IRQ context... */
- kibnal_schedule_tx_done(tx);
- return;
- }
-
rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
LASSERT (rc == 0);
+#ifndef USING_TSAPI
+ /* Somewhat belt-and-braces since the tx's conn has closed if
+ * this was a passive RDMA waiting to complete... */
if (tx->tx_status != 0)
ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
+#endif
tx->tx_mapped = KIB_TX_UNMAPPED;
break;
#endif
}
- for (i = 0; i < 2; i++) {
- /* tx may have up to 2 libmsgs to finalise */
- if (tx->tx_libmsg[i] == NULL)
- continue;
+ /* tx may have up to 2 ptlmsgs to finalise */
+ lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+ lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
+ rc = tx->tx_status;
- lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
- tx->tx_libmsg[i] = NULL;
- }
-
if (tx->tx_conn != NULL) {
- kibnal_put_conn (tx->tx_conn);
+ kibnal_conn_decref(tx->tx_conn);
tx->tx_conn = NULL;
}
spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
- if (tx->tx_isnblk) {
- list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
- } else {
- list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
- wake_up (&kibnal_data.kib_idle_tx_waitq);
- }
+ list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
+
+ /* delay finalize until my descs have been freed */
+ for (i = 0; i < 2; i++) {
+ if (lntmsg[i] == NULL)
+ continue;
+
+ lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+ }
}
kib_tx_t *
-kibnal_get_idle_tx (int may_block)
+kibnal_get_idle_tx (void)
{
unsigned long flags;
- kib_tx_t *tx = NULL;
+ kib_tx_t *tx;
- for (;;) {
- spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
-
- /* "normal" descriptor is free */
- if (!list_empty (&kibnal_data.kib_idle_txs)) {
- tx = list_entry (kibnal_data.kib_idle_txs.next,
- kib_tx_t, tx_list);
- break;
- }
-
- if (!may_block) {
- /* may dip into reserve pool */
- if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
- CERROR ("reserved tx desc pool exhausted\n");
- break;
- }
-
- tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
- kib_tx_t, tx_list);
- break;
- }
+ spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
- /* block for idle tx */
+ if (list_empty (&kibnal_data.kib_idle_txs)) {
spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
-
- wait_event (kibnal_data.kib_idle_tx_waitq,
- !list_empty (&kibnal_data.kib_idle_txs) ||
- kibnal_data.kib_shutdown);
+ return NULL;
}
- if (tx != NULL) {
- list_del (&tx->tx_list);
+ tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
+ list_del (&tx->tx_list);
- /* Allocate a new passive RDMA completion cookie. It might
- * not be needed, but we've got a lock right now and we're
- * unlikely to wrap... */
- tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
-
- LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
- LASSERT (tx->tx_nsp == 0);
- LASSERT (tx->tx_sending == 0);
- LASSERT (tx->tx_status == 0);
- LASSERT (tx->tx_conn == NULL);
- LASSERT (!tx->tx_passive_rdma);
- LASSERT (!tx->tx_passive_rdma_wait);
- LASSERT (tx->tx_libmsg[0] == NULL);
- LASSERT (tx->tx_libmsg[1] == NULL);
- }
+ /* Allocate a new passive RDMA completion cookie. It might not be
+ * needed, but we've got a lock right now and we're unlikely to
+ * wrap... */
+ tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
-
- return (tx);
-}
-
-int
-kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
-{
- /* I would guess that if kibnal_get_peer (nid) == NULL,
- and we're not routing, then 'nid' is very distant :) */
- if ( nal->libnal_ni.ni_pid.nid == nid ) {
- *dist = 0;
- } else {
- *dist = 1;
- }
- return 0;
+ LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
+ LASSERT (tx->tx_nsp == 0);
+ LASSERT (tx->tx_sending == 0);
+ LASSERT (tx->tx_status == 0);
+ LASSERT (tx->tx_conn == NULL);
+ LASSERT (!tx->tx_passive_rdma);
+ LASSERT (!tx->tx_passive_rdma_wait);
+ LASSERT (tx->tx_lntmsg[0] == NULL);
+ LASSERT (tx->tx_lntmsg[1] == NULL);
+
+ return tx;
}
void
CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
+ /* XXX Set mlength of reply here */
+
tx->tx_status = status;
tx->tx_passive_rdma_wait = 0;
idle = (tx->tx_sending == 0);
spin_unlock_irqrestore (&conn->ibc_lock, flags);
- CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
- cookie, conn->ibc_peer->ibp_nid);
+ CERROR ("Unmatched (late?) RDMA completion "LPX64" from %s\n",
+ cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
}
void
-kibnal_post_rx (kib_rx_t *rx, int do_credits)
+kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
{
kib_conn_t *conn = rx->rx_conn;
int rc;
unsigned long flags;
+ LASSERT(!rsrvd_credit ||
+ conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
+
rx->rx_gl = (struct ib_gather_scatter) {
.address = rx->rx_vaddr,
.length = IBNAL_MSG_SIZE,
};
LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
- LASSERT (!rx->rx_posted);
- rx->rx_posted = 1;
+ LASSERT (rx->rx_nob >= 0); /* not posted */
+ rx->rx_nob = -1; /* is now */
mb();
if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
rc = -ECONNABORTED;
else
- rc = ib_receive (conn->ibc_qp, &rx->rx_sp, 1);
+ rc = kibnal_ib_receive(conn->ibc_qp, &rx->rx_sp);
if (rc == 0) {
- if (do_credits) {
+ if (credit || rsrvd_credit) {
spin_lock_irqsave(&conn->ibc_lock, flags);
- conn->ibc_outstanding_credits++;
+
+ if (credit)
+ conn->ibc_outstanding_credits++;
+ if (rsrvd_credit)
+ conn->ibc_reserved_credits++;
+
spin_unlock_irqrestore(&conn->ibc_lock, flags);
kibnal_check_sends(conn);
}
if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
- CERROR ("Error posting receive -> "LPX64": %d\n",
- conn->ibc_peer->ibp_nid, rc);
+ CERROR ("Error posting receive -> %s: %d\n",
+ libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
kibnal_close_conn (rx->rx_conn, rc);
} else {
- CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
- conn->ibc_peer->ibp_nid, rc);
+ CDEBUG (D_NET, "Error posting receive -> %s: %d\n",
+ libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
}
/* Drop rx's ref */
- kibnal_put_conn (conn);
-}
-
-#if IBNAL_CKSUM
-__u32 kibnal_cksum (void *ptr, int nob)
-{
- char *c = ptr;
- __u32 sum = 0;
-
- while (nob-- > 0)
- sum = ((sum << 1) | (sum >> 31)) + *c++;
-
- return (sum);
+ kibnal_conn_decref(conn);
}
-#endif
void
kibnal_rx_callback (struct ib_cq_entry *e)
kib_rx_t *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id);
kib_msg_t *msg = rx->rx_msg;
kib_conn_t *conn = rx->rx_conn;
- int nob = e->bytes_transferred;
- const int base_nob = offsetof(kib_msg_t, ibm_u);
int credits;
- int flipped;
unsigned long flags;
-#if IBNAL_CKSUM
- __u32 msg_cksum;
- __u32 computed_cksum;
-#endif
+ int rc;
+ int err = -ECONNABORTED;
CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
- LASSERT (rx->rx_posted);
- rx->rx_posted = 0;
+ LASSERT (rx->rx_nob < 0); /* was posted */
+ rx->rx_nob = 0; /* isn't now */
mb();
/* receives complete with error in any case after we've started
LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED);
if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
- CERROR("Rx from "LPX64" failed: %d\n",
- conn->ibc_peer->ibp_nid, e->status);
- goto failed;
- }
-
- if (nob < base_nob) {
- CERROR ("Short rx from "LPX64": %d\n",
- conn->ibc_peer->ibp_nid, nob);
+ CERROR("Rx from %s failed: %d\n",
+ libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
goto failed;
}
- /* Receiver does any byte flipping if necessary... */
-
- if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
- flipped = 0;
- } else {
- if (msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
- CERROR ("Unrecognised magic: %08x from "LPX64"\n",
- msg->ibm_magic, conn->ibc_peer->ibp_nid);
- goto failed;
- }
- flipped = 1;
- __swab16s (&msg->ibm_version);
- LASSERT (sizeof(msg->ibm_type) == 1);
- LASSERT (sizeof(msg->ibm_credits) == 1);
- }
-
- if (msg->ibm_version != IBNAL_MSG_VERSION) {
- CERROR ("Incompatible msg version %d (%d expected)\n",
- msg->ibm_version, IBNAL_MSG_VERSION);
- goto failed;
- }
+ LASSERT (e->bytes_transferred >= 0);
+ rx->rx_nob = e->bytes_transferred;
+ mb();
-#if IBNAL_CKSUM
- if (nob != msg->ibm_nob) {
- CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->ibm_nob);
+ rc = kibnal_unpack_msg(msg, conn->ibc_version, rx->rx_nob);
+ if (rc != 0) {
+ CERROR ("Error %d unpacking rx from %s\n",
+ rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
goto failed;
}
- msg_cksum = le32_to_cpu(msg->ibm_cksum);
- msg->ibm_cksum = 0;
- computed_cksum = kibnal_cksum (msg, nob);
-
- if (msg_cksum != computed_cksum) {
- CERROR ("Checksum failure %d: (%d expected)\n",
- computed_cksum, msg_cksum);
+ if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
+ msg->ibm_srcnid) ||
+ !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
+ msg->ibm_dstnid) ||
+ msg->ibm_srcstamp != conn->ibc_incarnation ||
+ msg->ibm_dststamp != kibnal_data.kib_incarnation) {
+ CERROR ("Stale rx from %s\n",
+ libcfs_nid2str(conn->ibc_peer->ibp_nid));
+ err = -ESTALE;
goto failed;
}
- CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob);
-#endif
/* Have I received credits that will let me send? */
credits = msg->ibm_credits;
switch (msg->ibm_type) {
case IBNAL_MSG_NOOP:
- kibnal_post_rx (rx, 1);
+ kibnal_post_rx (rx, 1, 0);
return;
case IBNAL_MSG_IMMEDIATE:
- if (nob < base_nob + sizeof (kib_immediate_msg_t)) {
- CERROR ("Short IMMEDIATE from "LPX64": %d\n",
- conn->ibc_peer->ibp_nid, nob);
- goto failed;
- }
break;
case IBNAL_MSG_PUT_RDMA:
case IBNAL_MSG_GET_RDMA:
- if (nob < base_nob + sizeof (kib_rdma_msg_t)) {
- CERROR ("Short RDMA msg from "LPX64": %d\n",
- conn->ibc_peer->ibp_nid, nob);
- goto failed;
- }
- if (flipped) {
- __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key);
- __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob);
- __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr);
- }
CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n",
msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie,
msg->ibm_u.rdma.ibrm_desc.rd_key,
case IBNAL_MSG_PUT_DONE:
case IBNAL_MSG_GET_DONE:
- if (nob < base_nob + sizeof (kib_completion_msg_t)) {
- CERROR ("Short COMPLETION msg from "LPX64": %d\n",
- conn->ibc_peer->ibp_nid, nob);
- goto failed;
- }
- if (flipped)
- __swab32s(&msg->ibm_u.completion.ibcm_status);
-
CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
msg->ibm_u.completion.ibcm_status);
kibnal_complete_passive_rdma (conn,
msg->ibm_u.completion.ibcm_cookie,
msg->ibm_u.completion.ibcm_status);
- kibnal_post_rx (rx, 1);
+
+ if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
+ kibnal_post_rx (rx, 1, 0);
+ } else {
+ /* this reply buffer was pre-reserved */
+ kibnal_post_rx (rx, 0, 1);
+ }
return;
default:
- CERROR ("Can't parse type from "LPX64": %d\n",
- conn->ibc_peer->ibp_nid, msg->ibm_type);
+ CERROR ("Bad msg type %x from %s\n",
+ msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
goto failed;
}
+ kibnal_peer_alive(conn->ibc_peer);
+
/* schedule for kibnal_rx() in thread context */
spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
failed:
CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
- kibnal_close_conn(conn, -ECONNABORTED);
+ kibnal_close_conn(conn, err);
/* Don't re-post rx & drop its ref on conn */
- kibnal_put_conn(conn);
+ kibnal_conn_decref(conn);
}
void
kibnal_rx (kib_rx_t *rx)
{
+ int rc = 0;
kib_msg_t *msg = rx->rx_msg;
- /* Clear flag so I can detect if I've sent an RDMA completion */
- rx->rx_rdma = 0;
-
switch (msg->ibm_type) {
case IBNAL_MSG_GET_RDMA:
- lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
- /* If the incoming get was matched, I'll have initiated the
- * RDMA and the completion message... */
- if (rx->rx_rdma)
- break;
-
- /* Otherwise, I'll send a failed completion now to prevent
- * the peer's GET blocking for the full timeout. */
- CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
- rx->rx_conn->ibc_peer->ibp_nid);
- kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
- rx, NULL, 0, NULL, NULL, 0, 0);
+ rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
+ msg->ibm_srcnid, rx, 1);
break;
case IBNAL_MSG_PUT_RDMA:
- lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
- if (rx->rx_rdma)
- break;
- /* This is most unusual, since even if lib_parse() didn't
- * match anything, it should have asked us to read (and
- * discard) the payload. The portals header must be
- * inconsistent with this message type, so it's the
- * sender's fault for sending garbage and she can time
- * herself out... */
- CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
- rx->rx_conn->ibc_peer->ibp_nid);
+ rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
+ msg->ibm_srcnid, rx, 1);
break;
case IBNAL_MSG_IMMEDIATE:
- lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
- LASSERT (!rx->rx_rdma);
+ rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
+ msg->ibm_srcnid, rx, 0);
break;
-
+
default:
LBUG();
break;
}
- kibnal_post_rx (rx, 1);
+ if (rc < 0) {
+ kibnal_close_conn(rx->rx_conn, rc);
+ kibnal_post_rx (rx, 1, 0);
+ }
}
#if 0
if (vaddr >= VMALLOC_START &&
vaddr < VMALLOC_END)
page = vmalloc_to_page ((void *)vaddr);
-#if CONFIG_HIGHMEM
+#ifdef CONFIG_HIGHMEM
else if (vaddr >= PKMAP_BASE &&
vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
page = vmalloc_to_page ((void *)vaddr);
!VALID_PAGE (page))
return (-EFAULT);
- *physp = kibnal_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
+ *physp = lnet_page2phys(page) + (vaddr & (PAGE_SIZE - 1));
return (0);
}
#endif
int
-kibnal_map_iov (kib_tx_t *tx, enum ib_memory_access access,
- int niov, struct iovec *iov, int offset, int nob)
+kibnal_map_iov (kib_tx_t *tx, int access,
+ unsigned int niov, struct iovec *iov, int offset, int nob)
{
void *vaddr;
}
int
-kibnal_map_kiov (kib_tx_t *tx, enum ib_memory_access access,
- int nkiov, ptl_kiov_t *kiov,
+kibnal_map_kiov (kib_tx_t *tx, int access,
+ int nkiov, lnet_kiov_t *kiov,
int offset, int nob)
{
#if IBNAL_FMR
}
phys_size = nkiov * sizeof (*phys);
- PORTAL_ALLOC(phys, phys_size);
+ LIBCFS_ALLOC(phys, phys_size);
if (phys == NULL) {
CERROR ("Can't allocate tmp phys\n");
return (-ENOMEM);
page_offset = kiov->kiov_offset + offset;
#if IBNAL_FMR
- phys[0] = kibnal_page2phys(kiov->kiov_page);
+ phys[0] = lnet_page2phys(kiov->kiov_page);
#else
- phys[0].address = kibnal_page2phys(kiov->kiov_page);
+ phys[0].address = lnet_page2phys(kiov->kiov_page);
phys[0].size = PAGE_SIZE;
#endif
nphys = 1;
goto out;
}
- if (nphys == PTL_MD_MAX_IOV) {
+ if (nphys == LNET_MAX_IOV) {
CERROR ("payload too big (%d)\n", nphys);
rc = -EMSGSIZE;
goto out;
LASSERT (nphys * sizeof (*phys) < phys_size);
#if IBNAL_FMR
- phys[nphys] = kibnal_page2phys(kiov->kiov_page);
+ phys[nphys] = lnet_page2phys(kiov->kiov_page);
#else
- phys[nphys].address = kibnal_page2phys(kiov->kiov_page);
+ phys[nphys].address = lnet_page2phys(kiov->kiov_page);
phys[nphys].size = PAGE_SIZE;
#endif
nphys++;
resid -= PAGE_SIZE;
}
-#if 0
- CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
- for (rc = 0; rc < nphys; rc++)
- CWARN (" [%d] "LPX64" / %d\n", rc, phys[rc].address, phys[rc].size);
-#endif
tx->tx_md.md_addr = IBNAL_RDMA_BASE;
#if IBNAL_FMR
}
out:
- PORTAL_FREE(phys, phys_size);
+ LIBCFS_FREE(phys, phys_size);
return (rc);
}
kib_tx_t *tx;
int rc;
int i;
+ int consume_credit;
int done;
int nwork;
spin_lock_irqsave (&conn->ibc_lock, flags);
- LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
+ LASSERT (conn->ibc_nsends_posted <= IBNAL_RX_MSGS);
+ LASSERT (conn->ibc_reserved_credits >= 0);
+
+ while (conn->ibc_reserved_credits > 0 &&
+ !list_empty(&conn->ibc_tx_queue_rsrvd)) {
+ LASSERT (conn->ibc_version !=
+ IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
+ tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
+ kib_tx_t, tx_list);
+ list_del(&tx->tx_list);
+ list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
+ conn->ibc_reserved_credits--;
+ }
if (list_empty(&conn->ibc_tx_queue) &&
- conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
+ list_empty(&conn->ibc_tx_queue_nocred) &&
+ (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
+ kibnal_send_keepalive(conn))) {
spin_unlock_irqrestore(&conn->ibc_lock, flags);
- tx = kibnal_get_idle_tx(0); /* don't block */
+ tx = kibnal_get_idle_tx();
if (tx != NULL)
kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
spin_lock_irqsave(&conn->ibc_lock, flags);
- if (tx != NULL) {
- atomic_inc(&conn->ibc_refcount);
+ if (tx != NULL)
kibnal_queue_tx_locked(tx, conn);
- }
}
- while (!list_empty (&conn->ibc_tx_queue)) {
- tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
+ for (;;) {
+ if (!list_empty(&conn->ibc_tx_queue_nocred)) {
+ LASSERT (conn->ibc_version !=
+ IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
+ tx = list_entry(conn->ibc_tx_queue_nocred.next,
+ kib_tx_t, tx_list);
+ consume_credit = 0;
+ } else if (!list_empty (&conn->ibc_tx_queue)) {
+ tx = list_entry (conn->ibc_tx_queue.next,
+ kib_tx_t, tx_list);
+ consume_credit = 1;
+ } else {
+ /* nothing waiting */
+ break;
+ }
/* We rely on this for QP sizing */
LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= 2);
/* Not on ibc_rdma_queue */
LASSERT (!tx->tx_passive_rdma_wait);
- if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
+ if (conn->ibc_nsends_posted == IBNAL_RX_MSGS)
break;
- if (conn->ibc_credits == 0) /* no credits */
- break;
+ if (consume_credit) {
+ if (conn->ibc_credits == 0) /* no credits */
+ break;
+
+ if (conn->ibc_credits == 1 && /* last credit reserved for */
+ conn->ibc_outstanding_credits == 0) /* giving back credits */
+ break;
+ }
- if (conn->ibc_credits == 1 && /* last credit reserved for */
- conn->ibc_outstanding_credits == 0) /* giving back credits */
- break;
-
list_del (&tx->tx_list);
if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
(!list_empty(&conn->ibc_tx_queue) ||
- conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
+ !list_empty(&conn->ibc_tx_queue_nocred) ||
+ (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
+ !kibnal_send_keepalive(conn)))) {
/* redundant NOOP */
spin_unlock_irqrestore(&conn->ibc_lock, flags);
kibnal_tx_done(tx);
continue;
}
- tx->tx_msg->ibm_credits = conn->ibc_outstanding_credits;
- conn->ibc_outstanding_credits = 0;
+ kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
+ conn->ibc_outstanding_credits,
+ conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
+ conn->ibc_outstanding_credits = 0;
conn->ibc_nsends_posted++;
- conn->ibc_credits--;
+ if (consume_credit)
+ conn->ibc_credits--;
tx->tx_sending = tx->tx_nsp;
tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
list_add (&tx->tx_list, &conn->ibc_active_txs);
-#if IBNAL_CKSUM
- tx->tx_msg->ibm_cksum = 0;
- tx->tx_msg->ibm_cksum = kibnal_cksum(tx->tx_msg, tx->tx_msg->ibm_nob);
- CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->ibm_cksum, tx->tx_msg->ibm_nob);
-#endif
+
spin_unlock_irqrestore (&conn->ibc_lock, flags);
/* NB the gap between removing tx from the queue and sending it
tx->tx_status = 0;
/* Driver only accepts 1 item at a time */
for (i = 0; i < tx->tx_nsp; i++) {
- rc = ib_send (conn->ibc_qp, &tx->tx_sp[i], 1);
+ rc = kibnal_ib_send(conn->ibc_qp, &tx->tx_sp[i]);
if (rc != 0)
break;
nwork++;
}
}
+ conn->ibc_last_send = jiffies;
+
spin_lock_irqsave (&conn->ibc_lock, flags);
if (rc != 0) {
/* NB credits are transferred in the actual
* message, which can only be the last work item */
conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
- conn->ibc_credits++;
+ if (consume_credit)
+ conn->ibc_credits++;
conn->ibc_nsends_posted--;
tx->tx_status = rc;
spin_unlock_irqrestore (&conn->ibc_lock, flags);
if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
- CERROR ("Error %d posting transmit to "LPX64"\n",
- rc, conn->ibc_peer->ibp_nid);
+ CERROR ("Error %d posting transmit to %s\n",
+ rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
else
- CDEBUG (D_NET, "Error %d posting transmit to "
- LPX64"\n", rc, conn->ibc_peer->ibp_nid);
+ CDEBUG (D_NET, "Error %d posting transmit to %s\n",
+ rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
kibnal_close_conn (conn, rc);
if (idle)
list_del(&tx->tx_list);
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_inc (&conn->ibc_refcount);
+ kibnal_conn_addref(conn);
if (tx->tx_sending == 0)
conn->ibc_nsends_posted--;
kibnal_tx_done (tx);
if (e->status != IB_COMPLETION_STATUS_SUCCESS) {
- CERROR ("Tx completion to "LPX64" failed: %d\n",
- conn->ibc_peer->ibp_nid, e->status);
+ CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n",
+ libcfs_nid2str(conn->ibc_peer->ibp_nid), e->status);
kibnal_close_conn (conn, -ENETDOWN);
} else {
+ kibnal_peer_alive(conn->ibc_peer);
/* can I shovel some more sends out the door? */
kibnal_check_sends(conn);
}
- kibnal_put_conn (conn);
+ kibnal_conn_decref(conn);
}
void
-kibnal_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg)
+kibnal_callback (ib_cq_t *cq, struct ib_cq_entry *e, void *arg)
{
if (kibnal_wreqid_is_rx(e->work_request_id))
kibnal_rx_callback (e);
LASSERT (tx->tx_nsp >= 0 &&
tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0]));
LASSERT (nob <= IBNAL_MSG_SIZE);
-
- tx->tx_msg->ibm_magic = IBNAL_MSG_MAGIC;
- tx->tx_msg->ibm_version = IBNAL_MSG_VERSION;
- tx->tx_msg->ibm_type = type;
-#if IBNAL_CKSUM
- tx->tx_msg->ibm_nob = nob;
-#endif
+
+ kibnal_init_msg(tx->tx_msg, type, body_nob);
+
/* Fence the message if it's bundled with an RDMA read */
fence = (tx->tx_nsp > 0) &&
(type == IBNAL_MSG_PUT_DONE);
}
void
-kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
+kibnal_schedule_active_connect_locked (kib_peer_t *peer)
+{
+ /* Called with exclusive kib_global_lock */
+
+ peer->ibp_connecting++;
+ kibnal_peer_addref(peer); /* extra ref for connd */
+
+ spin_lock (&kibnal_data.kib_connd_lock);
+
+ LASSERT (list_empty(&peer->ibp_connd_list));
+ list_add_tail (&peer->ibp_connd_list,
+ &kibnal_data.kib_connd_peers);
+ wake_up (&kibnal_data.kib_connd_waitq);
+
+ spin_unlock (&kibnal_data.kib_connd_lock);
+}
+
+void
+kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
{
unsigned long flags;
kib_peer_t *peer;
kib_conn_t *conn;
+ int retry;
+ int rc;
rwlock_t *g_lock = &kibnal_data.kib_global_lock;
/* If I get here, I've committed to send, so I complete the tx with
LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
LASSERT (tx->tx_nsp > 0); /* work items have been set up */
- read_lock (g_lock);
+ for (retry = 0; ; retry = 1) {
+ read_lock_irqsave(g_lock, flags);
- peer = kibnal_find_peer_locked (nid);
- if (peer == NULL) {
- read_unlock (g_lock);
- tx->tx_status = -EHOSTUNREACH;
- kibnal_tx_done (tx);
- return;
- }
-
- conn = kibnal_find_conn_locked (peer);
- if (conn != NULL) {
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
- read_unlock (g_lock);
+ peer = kibnal_find_peer_locked (nid);
+ if (peer != NULL) {
+ conn = kibnal_find_conn_locked (peer);
+ if (conn != NULL) {
+ kibnal_conn_addref(conn); /* 1 ref for me...*/
+ read_unlock_irqrestore(g_lock, flags);
- kibnal_queue_tx (tx, conn);
- return;
- }
-
- /* Making one or more connections; I'll need a write lock... */
- read_unlock (g_lock);
- write_lock_irqsave (g_lock, flags);
+ kibnal_queue_tx (tx, conn);
+ kibnal_conn_decref(conn); /* ...until here */
+ return;
+ }
+ }
+
+ /* Making one or more connections; I'll need a write lock... */
+ read_unlock(g_lock);
+ write_lock(g_lock);
- peer = kibnal_find_peer_locked (nid);
- if (peer == NULL) {
+ peer = kibnal_find_peer_locked (nid);
+ if (peer != NULL)
+ break;
+
write_unlock_irqrestore (g_lock, flags);
- tx->tx_status = -EHOSTUNREACH;
- kibnal_tx_done (tx);
- return;
+
+ if (retry) {
+ CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
+ tx->tx_status = -EHOSTUNREACH;
+ kibnal_tx_done (tx);
+ return;
+ }
+
+ rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid),
+ lnet_acceptor_port());
+ if (rc != 0) {
+ CERROR("Can't add peer %s: %d\n",
+ libcfs_nid2str(nid), rc);
+ tx->tx_status = rc;
+ kibnal_tx_done(tx);
+ return;
+ }
}
conn = kibnal_find_conn_locked (peer);
if (conn != NULL) {
/* Connection exists; queue message on it */
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
+ kibnal_conn_addref(conn); /* +1 ref from me... */
write_unlock_irqrestore (g_lock, flags);
kibnal_queue_tx (tx, conn);
+ kibnal_conn_decref(conn); /* ...until here */
return;
}
- if (peer->ibp_connecting == 0) {
- if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
+ if (peer->ibp_connecting == 0 &&
+ peer->ibp_accepting == 0) {
+ if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
+ time_after_eq(jiffies, peer->ibp_reconnect_time))) {
write_unlock_irqrestore (g_lock, flags);
tx->tx_status = -EHOSTUNREACH;
kibnal_tx_done (tx);
return;
}
- peer->ibp_connecting = 1;
- atomic_inc (&peer->ibp_refcount); /* extra ref for connd */
-
- spin_lock (&kibnal_data.kib_connd_lock);
-
- list_add_tail (&peer->ibp_connd_list,
- &kibnal_data.kib_connd_peers);
- wake_up (&kibnal_data.kib_connd_waitq);
-
- spin_unlock (&kibnal_data.kib_connd_lock);
+ kibnal_schedule_active_connect_locked(peer);
}
/* A connection is being established; queue the message... */
write_unlock_irqrestore (g_lock, flags);
}
-ptl_err_t
-kibnal_start_passive_rdma (int type, ptl_nid_t nid,
- lib_msg_t *libmsg, ptl_hdr_t *hdr)
+void
+kibnal_txlist_done (struct list_head *txlist, int status)
+{
+ kib_tx_t *tx;
+
+ while (!list_empty(txlist)) {
+ tx = list_entry (txlist->next, kib_tx_t, tx_list);
+
+ list_del (&tx->tx_list);
+ /* complete now */
+ tx->tx_status = status;
+ kibnal_tx_done (tx);
+ }
+}
+
+int
+kibnal_start_passive_rdma (int type, lnet_msg_t *lntmsg,
+ int niov, struct iovec *iov, lnet_kiov_t *kiov,
+ int nob)
{
- int nob = libmsg->md->length;
+ lnet_nid_t nid = lntmsg->msg_target.nid;
kib_tx_t *tx;
kib_msg_t *ibmsg;
int rc;
IB_ACCESS_LOCAL_WRITE;
}
- tx = kibnal_get_idle_tx (1); /* May block; caller is an app thread */
- LASSERT (tx != NULL);
+ tx = kibnal_get_idle_tx ();
+ if (tx == NULL) {
+ CERROR("Can't allocate %s txd for %s\n",
+ (type == IBNAL_MSG_PUT_RDMA) ? "PUT/REPLY" : "GET",
+ libcfs_nid2str(nid));
+ return -ENOMEM;
+ }
- if ((libmsg->md->options & PTL_MD_KIOV) == 0)
- rc = kibnal_map_iov (tx, access,
- libmsg->md->md_niov,
- libmsg->md->md_iov.iov,
- 0, nob);
+
+ if (iov != NULL)
+ rc = kibnal_map_iov (tx, access, niov, iov, 0, nob);
else
- rc = kibnal_map_kiov (tx, access,
- libmsg->md->md_niov,
- libmsg->md->md_iov.kiov,
- 0, nob);
+ rc = kibnal_map_kiov (tx, access, niov, kiov, 0, nob);
if (rc != 0) {
- CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
+ CERROR ("Can't map RDMA for %s: %d\n",
+ libcfs_nid2str(nid), rc);
goto failed;
}
if (type == IBNAL_MSG_GET_RDMA) {
/* reply gets finalized when tx completes */
- tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib,
- nid, libmsg);
- if (tx->tx_libmsg[1] == NULL) {
- CERROR ("Can't create reply for GET -> "LPX64"\n",
- nid);
+ tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni,
+ lntmsg);
+ if (tx->tx_lntmsg[1] == NULL) {
+ CERROR ("Can't create reply for GET -> %s\n",
+ libcfs_nid2str(nid));
rc = -ENOMEM;
goto failed;
}
ibmsg = tx->tx_msg;
- ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
+ ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr;
ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;
ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;
tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
tx->tx_md.md_addr, nob);
- /* libmsg gets finalized when tx completes. */
- tx->tx_libmsg[0] = libmsg;
+ /* lntmsg gets finalized when tx completes. */
+ tx->tx_lntmsg[0] = lntmsg;
kibnal_launch_tx(tx, nid);
- return (PTL_OK);
+ return (0);
failed:
tx->tx_status = rc;
kibnal_tx_done (tx);
- return (PTL_FAIL);
+ return (-EIO);
}
void
kibnal_start_active_rdma (int type, int status,
- kib_rx_t *rx, lib_msg_t *libmsg,
- unsigned int niov,
- struct iovec *iov, ptl_kiov_t *kiov,
- size_t offset, size_t nob)
+ kib_rx_t *rx, lnet_msg_t *lntmsg,
+ unsigned int niov,
+ struct iovec *iov, lnet_kiov_t *kiov,
+ int offset, int nob)
{
kib_msg_t *rxmsg = rx->rx_msg;
kib_msg_t *txmsg;
LASSERT (type == IBNAL_MSG_GET_DONE ||
type == IBNAL_MSG_PUT_DONE);
- /* Flag I'm completing the RDMA. Even if I fail to send the
- * completion message, I will have tried my best so further
- * attempts shouldn't be tried. */
- LASSERT (!rx->rx_rdma);
- rx->rx_rdma = 1;
-
if (type == IBNAL_MSG_GET_DONE) {
access = 0;
rdma_op = IB_OP_RDMA_WRITE;
LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
}
- tx = kibnal_get_idle_tx (0); /* Mustn't block */
+ tx = kibnal_get_idle_tx ();
if (tx == NULL) {
- CERROR ("tx descs exhausted on RDMA from "LPX64
+ CERROR ("tx descs exhausted on RDMA from %s"
" completing locally with failure\n",
- rx->rx_conn->ibc_peer->ibp_nid);
- lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
+ libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid));
+ lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM);
return;
}
LASSERT (tx->tx_nsp == 0);
niov, iov, offset, nob);
if (rc != 0) {
- CERROR ("Can't map RDMA -> "LPX64": %d\n",
- rx->rx_conn->ibc_peer->ibp_nid, rc);
+ CERROR ("Can't map RDMA -> %s: %d\n",
+ libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid),
+ rc);
/* We'll skip the RDMA and complete with failure. */
status = rc;
nob = 0;
if (status == 0 && nob != 0) {
LASSERT (tx->tx_nsp > 1);
- /* RDMA: libmsg gets finalized when the tx completes. This
+ /* RDMA: lntmsg gets finalized when the tx completes. This
* is after the completion message has been sent, which in
* turn is after the RDMA has finished. */
- tx->tx_libmsg[0] = libmsg;
+ tx->tx_lntmsg[0] = lntmsg;
} else {
LASSERT (tx->tx_nsp == 1);
/* No RDMA: local completion happens now! */
- CDEBUG(D_WARNING,"No data: immediate completion\n");
- lib_finalize (&kibnal_lib, NULL, libmsg,
- status == 0 ? PTL_OK : PTL_FAIL);
- }
-
- /* +1 ref for this tx... */
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- rx->rx_conn, rx->rx_conn->ibc_state,
- rx->rx_conn->ibc_peer->ibp_nid,
- atomic_read (&rx->rx_conn->ibc_refcount));
- atomic_inc (&rx->rx_conn->ibc_refcount);
- /* ...and queue it up */
+ CDEBUG(D_NET, "No data: immediate completion\n");
+ lnet_finalize (kibnal_data.kib_ni, lntmsg,
+ status == 0 ? 0 : -EIO);
+ }
+
kibnal_queue_tx(tx, rx->rx_conn);
}
-ptl_err_t
-kibnal_sendmsg(lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- ptl_hdr_t *hdr,
- int type,
- ptl_nid_t nid,
- ptl_pid_t pid,
- unsigned int payload_niov,
- struct iovec *payload_iov,
- ptl_kiov_t *payload_kiov,
- size_t payload_offset,
- size_t payload_nob)
+int
+kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
{
- kib_msg_t *ibmsg;
- kib_tx_t *tx;
- int nob;
+ lnet_hdr_t *hdr = &lntmsg->msg_hdr;
+ int type = lntmsg->msg_type;
+ lnet_process_id_t target = lntmsg->msg_target;
+ int target_is_router = lntmsg->msg_target_is_router;
+ int routing = lntmsg->msg_routing;
+ unsigned int payload_niov = lntmsg->msg_niov;
+ struct iovec *payload_iov = lntmsg->msg_iov;
+ lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
+ unsigned int payload_offset = lntmsg->msg_offset;
+ unsigned int payload_nob = lntmsg->msg_len;
+ kib_msg_t *ibmsg;
+ kib_tx_t *tx;
+ int nob;
/* NB 'private' is different depending on what we're sending.... */
- CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
- " pid %d\n", payload_nob, payload_niov, nid , pid);
+ CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
+ payload_nob, payload_niov, libcfs_id2str(target));
LASSERT (payload_nob == 0 || payload_niov > 0);
- LASSERT (payload_niov <= PTL_MD_MAX_IOV);
+ LASSERT (payload_niov <= LNET_MAX_IOV);
/* Thread context if we're sending payload */
LASSERT (!in_interrupt() || payload_niov == 0);
switch (type) {
default:
LBUG();
- return (PTL_FAIL);
-
- case PTL_MSG_REPLY: {
- /* reply's 'private' is the incoming receive */
- kib_rx_t *rx = private;
-
- /* RDMA reply expected? */
- if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
- kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
- rx, libmsg, payload_niov,
- payload_iov, payload_kiov,
- payload_offset, payload_nob);
- return (PTL_OK);
- }
+ return (-EIO);
- /* Incoming message consistent with immediate reply? */
- if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
- CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
- nid, rx->rx_msg->ibm_type);
- return (PTL_FAIL);
- }
-
- /* Will it fit in a message? */
- nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
- if (nob >= IBNAL_MSG_SIZE) {
- CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n",
- nid, payload_nob);
- return (PTL_FAIL);
- }
- break;
- }
-
- case PTL_MSG_GET:
- /* might the REPLY message be big enough to need RDMA? */
- nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
- if (nob > IBNAL_MSG_SIZE)
- return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA,
- nid, libmsg, hdr));
- break;
-
- case PTL_MSG_ACK:
+ case LNET_MSG_ACK:
LASSERT (payload_nob == 0);
break;
- case PTL_MSG_PUT:
- /* Is the payload big enough to need RDMA? */
+ case LNET_MSG_GET:
+ if (routing || target_is_router)
+ break; /* send IMMEDIATE */
+
+ /* is the REPLY message too small for RDMA? */
+ nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
+ if (nob <= IBNAL_MSG_SIZE)
+ break; /* send IMMEDIATE */
+
+ if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
+ return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg,
+ lntmsg->msg_md->md_niov,
+ lntmsg->msg_md->md_iov.iov, NULL,
+ lntmsg->msg_md->md_length);
+
+ return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg,
+ lntmsg->msg_md->md_niov,
+ NULL, lntmsg->msg_md->md_iov.kiov,
+ lntmsg->msg_md->md_length);
+
+ case LNET_MSG_REPLY:
+ case LNET_MSG_PUT:
+ /* Is the payload small enough not to need RDMA? */
nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
- if (nob > IBNAL_MSG_SIZE)
- return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
- nid, libmsg, hdr));
+ if (nob <= IBNAL_MSG_SIZE)
+ break; /* send IMMEDIATE */
- break;
+ return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg,
+ payload_niov,
+ payload_iov, payload_kiov,
+ payload_nob);
}
- tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
- type == PTL_MSG_REPLY ||
- in_interrupt()));
+ /* Send IMMEDIATE */
+
+ tx = kibnal_get_idle_tx();
if (tx == NULL) {
- CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n",
- type, nid, in_interrupt() ? " (intr)" : "");
- return (PTL_NO_SPACE);
+ CERROR ("Can't send %d to %s: tx descs exhausted%s\n",
+ type, libcfs_nid2str(target.nid),
+ in_interrupt() ? " (intr)" : "");
+ return (-ENOMEM);
}
ibmsg = tx->tx_msg;
ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
- if (payload_nob > 0) {
- if (payload_kiov != NULL)
- lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
- payload_niov, payload_kiov,
- payload_offset, payload_nob);
- else
- lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
- payload_niov, payload_iov,
- payload_offset, payload_nob);
- }
+ if (payload_kiov != NULL)
+ lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
+ offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+ payload_niov, payload_kiov,
+ payload_offset, payload_nob);
+ else
+ lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
+ offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
offsetof(kib_immediate_msg_t,
ibim_payload[payload_nob]));
- /* libmsg gets finalized when tx completes */
- tx->tx_libmsg[0] = libmsg;
-
- kibnal_launch_tx(tx, nid);
- return (PTL_OK);
-}
+ /* lntmsg gets finalized when tx completes */
+ tx->tx_lntmsg[0] = lntmsg;
-ptl_err_t
-kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int payload_niov, struct iovec *payload_iov,
- size_t payload_offset, size_t payload_len)
-{
- return (kibnal_sendmsg(nal, private, cookie,
- hdr, type, nid, pid,
- payload_niov, payload_iov, NULL,
- payload_offset, payload_len));
+ kibnal_launch_tx(tx, target.nid);
+ return (0);
}
-ptl_err_t
-kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int payload_niov, ptl_kiov_t *payload_kiov,
- size_t payload_offset, size_t payload_len)
+int
+kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+ void **new_private)
{
- return (kibnal_sendmsg(nal, private, cookie,
- hdr, type, nid, pid,
- payload_niov, NULL, payload_kiov,
- payload_offset, payload_len));
+ kib_rx_t *rx = private;
+ kib_conn_t *conn = rx->rx_conn;
+
+ if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
+ /* Can't block if RDMA completions need normal credits */
+ LCONSOLE_ERROR_MSG(0x12a,
+ "Dropping message from %s: no buffers free. "
+ "%s is running an old version of LNET that may "
+ "deadlock if messages wait for buffers)\n",
+ libcfs_nid2str(conn->ibc_peer->ibp_nid),
+ libcfs_nid2str(conn->ibc_peer->ibp_nid));
+ return -EDEADLK;
+ }
+
+ *new_private = private;
+ return 0;
}
-ptl_err_t
-kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
- unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
- size_t offset, size_t mlen, size_t rlen)
+int
+kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+ int delayed, unsigned int niov,
+ struct iovec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int mlen, unsigned int rlen)
{
kib_rx_t *rx = private;
kib_msg_t *rxmsg = rx->rx_msg;
int msg_nob;
+ int rc = 0;
LASSERT (mlen <= rlen);
LASSERT (!in_interrupt ());
switch (rxmsg->ibm_type) {
default:
LBUG();
- return (PTL_FAIL);
-
+
case IBNAL_MSG_IMMEDIATE:
msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
- if (msg_nob > IBNAL_MSG_SIZE) {
- CERROR ("Immediate message from "LPX64" too big: %d\n",
- rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
- return (PTL_FAIL);
+ if (msg_nob > rx->rx_nob) {
+ CERROR ("Immediate message from %s too big: %d(%d)\n",
+ libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
+ msg_nob, rx->rx_nob);
+ rc = -EPROTO;
+ break;
}
if (kiov != NULL)
- lib_copy_buf2kiov(niov, kiov, offset,
- rxmsg->ibm_u.immediate.ibim_payload,
- mlen);
+ lnet_copy_flat2kiov(
+ niov, kiov, offset,
+ IBNAL_MSG_SIZE, rxmsg,
+ offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+ mlen);
else
- lib_copy_buf2iov(niov, iov, offset,
- rxmsg->ibm_u.immediate.ibim_payload,
- mlen);
+ lnet_copy_flat2iov(
+ niov, iov, offset,
+ IBNAL_MSG_SIZE, rxmsg,
+ offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+ mlen);
- lib_finalize (nal, NULL, libmsg, PTL_OK);
- return (PTL_OK);
+ lnet_finalize (ni, lntmsg, 0);
+ break;
case IBNAL_MSG_GET_RDMA:
- /* We get called here just to discard any junk after the
- * GET hdr. */
- LASSERT (libmsg == NULL);
- lib_finalize (nal, NULL, libmsg, PTL_OK);
- return (PTL_OK);
+ if (lntmsg != NULL) {
+ /* GET matched: RDMA lntmsg's payload */
+ kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
+ rx, lntmsg,
+ lntmsg->msg_niov,
+ lntmsg->msg_iov,
+ lntmsg->msg_kiov,
+ lntmsg->msg_offset,
+ lntmsg->msg_len);
+ } else {
+ /* GET didn't match anything */
+ kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
+ rx, NULL, 0, NULL, NULL, 0, 0);
+ }
+ break;
case IBNAL_MSG_PUT_RDMA:
- kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
- rx, libmsg,
+ kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg,
niov, iov, kiov, offset, mlen);
- return (PTL_OK);
+ break;
}
-}
-
-ptl_err_t
-kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
- unsigned int niov, struct iovec *iov,
- size_t offset, size_t mlen, size_t rlen)
-{
- return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
- offset, mlen, rlen));
-}
-ptl_err_t
-kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
- unsigned int niov, ptl_kiov_t *kiov,
- size_t offset, size_t mlen, size_t rlen)
-{
- return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
- offset, mlen, rlen));
+ kibnal_post_rx(rx, 1, 0);
+ return rc;
}
int
}
void
+kibnal_peer_alive (kib_peer_t *peer)
+{
+ /* This is racy, but everyone's only writing cfs_time_current() */
+ peer->ibp_last_alive = cfs_time_current();
+ mb();
+}
+
+void
+kibnal_peer_notify (kib_peer_t *peer)
+{
+ time_t last_alive = 0;
+ int error = 0;
+ unsigned long flags;
+
+ read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
+
+ if (list_empty(&peer->ibp_conns) &&
+ peer->ibp_accepting == 0 &&
+ peer->ibp_connecting == 0 &&
+ peer->ibp_error != 0) {
+ error = peer->ibp_error;
+ peer->ibp_error = 0;
+ last_alive = cfs_time_current_sec() -
+ cfs_duration_sec(cfs_time_current() -
+ peer->ibp_last_alive);
+ }
+
+ read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
+
+ if (error != 0)
+ lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
+}
+
+void
kibnal_close_conn_locked (kib_conn_t *conn, int error)
{
/* This just does the immmediate housekeeping, and schedules the
- * connection for the connd to finish off.
+ * connection for the reaper to finish off.
* Caller holds kib_global_lock exclusively in irq context */
kib_peer_t *peer = conn->ibc_peer;
- CDEBUG (error == 0 ? D_NET : D_ERROR,
- "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
+ CDEBUG (error == 0 ? D_NET : D_NETERROR,
+ "closing conn to %s: error %d\n",
+ libcfs_nid2str(peer->ibp_nid), error);
LASSERT (conn->ibc_state == IBNAL_CONN_ESTABLISHED ||
conn->ibc_state == IBNAL_CONN_CONNECTING);
if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
- /* kib_connd_conns takes ibc_list's ref */
+ /* kib_reaper_conns takes ibc_list's ref */
list_del (&conn->ibc_list);
} else {
- /* new ref for kib_connd_conns */
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_inc (&conn->ibc_refcount);
+ /* new ref for kib_reaper_conns */
+ kibnal_conn_addref(conn);
}
- if (list_empty (&peer->ibp_conns) &&
- peer->ibp_persistence == 0) {
- /* Non-persistent peer with no more conns... */
- kibnal_unlink_peer_locked (peer);
+ if (list_empty (&peer->ibp_conns)) { /* no more conns */
+ if (peer->ibp_persistence == 0 && /* non-persistent peer */
+ kibnal_peer_active(peer)) /* still in peer table */
+ kibnal_unlink_peer_locked (peer);
+
+ peer->ibp_error = error; /* set/clear error on last conn */
}
conn->ibc_state = IBNAL_CONN_DEATHROW;
/* Schedule conn for closing/destruction */
- spin_lock (&kibnal_data.kib_connd_lock);
+ spin_lock (&kibnal_data.kib_reaper_lock);
- list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
- wake_up (&kibnal_data.kib_connd_waitq);
+ list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
+ wake_up (&kibnal_data.kib_reaper_waitq);
- spin_unlock (&kibnal_data.kib_connd_lock);
+ spin_unlock (&kibnal_data.kib_reaper_lock);
}
int
}
void
-kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
+kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
{
LIST_HEAD (zombies);
- kib_tx_t *tx;
unsigned long flags;
- LASSERT (rc != 0);
- LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
+ LASSERT(error != 0);
write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
- LASSERT (peer->ibp_connecting != 0);
- peer->ibp_connecting--;
-
- if (peer->ibp_connecting != 0) {
- /* another connection attempt under way (loopback?)... */
- write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
+ if (active) {
+ LASSERT (peer->ibp_connecting != 0);
+ peer->ibp_connecting--;
+ } else {
+ LASSERT (peer->ibp_accepting != 0);
+ peer->ibp_accepting--;
+ }
+
+ if (peer->ibp_connecting != 0 ||
+ peer->ibp_accepting != 0) {
+ /* another connection attempt under way... */
+ write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
return;
}
if (list_empty(&peer->ibp_conns)) {
/* Say when active connection can be re-attempted */
- peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
- /* Increase reconnection interval */
- peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
- IBNAL_MAX_RECONNECT_INTERVAL);
+ peer->ibp_reconnect_interval *= 2;
+ peer->ibp_reconnect_interval =
+ MAX(peer->ibp_reconnect_interval,
+ *kibnal_tunables.kib_min_reconnect_interval);
+ peer->ibp_reconnect_interval =
+ MIN(peer->ibp_reconnect_interval,
+ *kibnal_tunables.kib_max_reconnect_interval);
+
+ peer->ibp_reconnect_time = jiffies +
+ peer->ibp_reconnect_interval * HZ;
- /* Take peer's blocked blocked transmits; I'll complete
+ /* Take peer's blocked transmits; I'll complete
* them with error */
- while (!list_empty (&peer->ibp_tx_queue)) {
- tx = list_entry (peer->ibp_tx_queue.next,
- kib_tx_t, tx_list);
-
- list_del (&tx->tx_list);
- list_add_tail (&tx->tx_list, &zombies);
- }
+ list_add(&zombies, &peer->ibp_tx_queue);
+ list_del_init(&peer->ibp_tx_queue);
if (kibnal_peer_active(peer) &&
(peer->ibp_persistence == 0)) {
/* failed connection attempt on non-persistent peer */
kibnal_unlink_peer_locked (peer);
}
+
+ peer->ibp_error = error;
} else {
/* Can't have blocked transmits if there are connections */
LASSERT (list_empty(&peer->ibp_tx_queue));
write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
+ kibnal_peer_notify(peer);
+
if (!list_empty (&zombies))
- CERROR ("Deleting messages for "LPX64": connection failed\n",
- peer->ibp_nid);
-
- while (!list_empty (&zombies)) {
- tx = list_entry (zombies.next, kib_tx_t, tx_list);
+ CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
+ libcfs_nid2str(peer->ibp_nid));
- list_del (&tx->tx_list);
- /* complete now */
- tx->tx_status = -EHOSTUNREACH;
- kibnal_tx_done (tx);
- }
+ kibnal_txlist_done(&zombies, -EHOSTUNREACH);
}
void
int rc;
int i;
- /* passive connection has no connreq & vice versa */
- LASSERT (!active == !(conn->ibc_connreq != NULL));
- if (active) {
- PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
+ if (conn->ibc_connreq != NULL) {
+ LIBCFS_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
conn->ibc_connreq = NULL;
}
- if (state == IBNAL_CONN_CONNECTING) {
- /* Install common (active/passive) callback for
- * disconnect/idle notification if I got as far as getting
- * a CM comm_id */
- rc = tsIbCmCallbackModify(conn->ibc_comm_id,
- kibnal_conn_callback, conn);
- LASSERT (rc == 0);
+ switch (state) {
+ case IBNAL_CONN_CONNECTING:
+ /* conn has a CM comm_id */
+ if (status == 0) {
+ /* Install common (active/passive) callback for
+ * disconnect/idle notification */
+ rc = tsIbCmCallbackModify(conn->ibc_comm_id,
+ kibnal_conn_callback,
+ conn);
+ LASSERT (rc == 0);
+ } else {
+ /* LASSERT (no more CM callbacks) */
+ rc = tsIbCmCallbackModify(conn->ibc_comm_id,
+ kibnal_bad_conn_callback,
+ conn);
+ LASSERT (rc == 0);
+ }
+ break;
+
+ case IBNAL_CONN_INIT_QP:
+ LASSERT (status != 0);
+ break;
+
+ default:
+ LBUG();
}
write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
- LASSERT (peer->ibp_connecting != 0);
+ if (active)
+ LASSERT (peer->ibp_connecting != 0);
+ else
+ LASSERT (peer->ibp_accepting != 0);
- if (status == 0) {
- /* connection established... */
- LASSERT (state == IBNAL_CONN_CONNECTING);
- conn->ibc_state = IBNAL_CONN_ESTABLISHED;
-
- if (!kibnal_peer_active(peer)) {
- /* ...but peer deleted meantime */
- status = -ECONNABORTED;
- }
- } else {
- LASSERT (state == IBNAL_CONN_INIT_QP ||
- state == IBNAL_CONN_CONNECTING);
- }
+ if (status == 0 && /* connection established */
+ kibnal_peer_active(peer)) { /* peer not deleted */
- if (status == 0) {
- /* Everything worked! */
+ if (active)
+ peer->ibp_connecting--;
+ else
+ peer->ibp_accepting--;
- peer->ibp_connecting--;
+ conn->ibc_last_send = jiffies;
+ conn->ibc_state = IBNAL_CONN_ESTABLISHED;
+ kibnal_peer_alive(peer);
/* +1 ref for ibc_list; caller(== CM)'s ref remains until
* the IB_CM_IDLE callback */
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_inc (&conn->ibc_refcount);
+ kibnal_conn_addref(conn);
list_add (&conn->ibc_list, &peer->ibp_conns);
-
- /* reset reconnect interval for next attempt */
- peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
+
+ peer->ibp_reconnect_interval = 0; /* OK to reconnect at any time */
/* post blocked sends to the new connection */
spin_lock (&conn->ibc_lock);
list_del (&tx->tx_list);
- /* +1 ref for each tx */
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_inc (&conn->ibc_refcount);
kibnal_queue_tx_locked (tx, conn);
}
/* queue up all the receives */
for (i = 0; i < IBNAL_RX_MSGS; i++) {
/* +1 ref for rx desc */
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_inc (&conn->ibc_refcount);
+ kibnal_conn_addref(conn);
CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
conn->ibc_rxs[i].rx_vaddr);
- kibnal_post_rx (&conn->ibc_rxs[i], 0);
+ kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
}
kibnal_check_sends (conn);
return;
}
- /* connection failed */
- if (state == IBNAL_CONN_CONNECTING) {
- /* schedule for connd to close */
+ if (status == 0) {
+ /* connection established, but peer was deleted. Schedule for
+ * reaper to cm_disconnect... */
+ status = -ECONNABORTED;
kibnal_close_conn_locked (conn, status);
} else {
- /* Don't have a CM comm_id; just wait for refs to drain */
+ /* just waiting for refs to drain */
conn->ibc_state = IBNAL_CONN_ZOMBIE;
}
write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
kibnal_peer_connect_failed (conn->ibc_peer, active, status);
-
- if (state != IBNAL_CONN_CONNECTING) {
- /* drop caller's ref if we're not waiting for the
- * IB_CM_IDLE callback */
- kibnal_put_conn (conn);
- }
}
int
-kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
- ptl_nid_t nid, __u64 incarnation, int queue_depth)
+kibnal_accept_connreq (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid,
+ kib_msg_t *msg, int nob)
{
- kib_conn_t *conn = kibnal_create_conn();
+ kib_conn_t *conn;
kib_peer_t *peer;
kib_peer_t *peer2;
unsigned long flags;
+ int rc;
- if (conn == NULL)
- return (-ENOMEM);
+ rc = kibnal_unpack_msg(msg, 0, nob);
+ if (rc != 0) {
+ CERROR("Can't unpack connreq msg: %d\n", rc);
+ return -EPROTO;
+ }
+
+ CDEBUG(D_NET, "connreq from %s\n", libcfs_nid2str(msg->ibm_srcnid));
- if (queue_depth != IBNAL_MSG_QUEUE_SIZE) {
- CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
- nid, queue_depth, IBNAL_MSG_QUEUE_SIZE);
+ if (msg->ibm_type != IBNAL_MSG_CONNREQ) {
+ CERROR("Unexpected connreq msg type: %x from %s\n",
+ msg->ibm_type, libcfs_nid2str(msg->ibm_srcnid));
+ return -EPROTO;
+ }
+
+ if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
+ CERROR("Can't accept %s: bad queue depth %d (%d expected)\n",
+ libcfs_nid2str(msg->ibm_srcnid),
+ msg->ibm_u.connparams.ibcp_queue_depth,
+ IBNAL_MSG_QUEUE_SIZE);
return (-EPROTO);
}
+ conn = kibnal_create_conn();
+ if (conn == NULL)
+ return (-ENOMEM);
+
/* assume 'nid' is a new peer */
- peer = kibnal_create_peer (nid);
- if (peer == NULL) {
- CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
- atomic_dec (&conn->ibc_refcount);
- kibnal_destroy_conn(conn);
+ rc = kibnal_create_peer(&peer, msg->ibm_srcnid);
+ if (rc != 0) {
+ kibnal_conn_decref(conn);
return (-ENOMEM);
}
write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
- peer2 = kibnal_find_peer_locked(nid);
+ if (kibnal_data.kib_nonewpeers) {
+ write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
+
+ CERROR ("Shutdown has started, drop connreq from %s\n",
+ libcfs_nid2str(msg->ibm_srcnid));
+ kibnal_conn_decref(conn);
+ kibnal_peer_decref(peer);
+ return -ESHUTDOWN;
+ }
+
+ /* Check I'm the same instance that gave the connection parameters.
+ * NB If my incarnation changes after this, the peer will get nuked and
+ * we'll spot that when the connection is finally added into the peer's
+ * connlist */
+ if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
+ msg->ibm_dstnid) ||
+ msg->ibm_dststamp != kibnal_data.kib_incarnation) {
+ write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
+
+ CERROR("Stale connection params from %s\n",
+ libcfs_nid2str(msg->ibm_srcnid));
+ kibnal_conn_decref(conn);
+ kibnal_peer_decref(peer);
+ return -ESTALE;
+ }
+
+ peer2 = kibnal_find_peer_locked(msg->ibm_srcnid);
if (peer2 == NULL) {
+ /* Brand new peer */
+ LASSERT (peer->ibp_accepting == 0);
+
/* peer table takes my ref on peer */
list_add_tail (&peer->ibp_list,
- kibnal_nid2peerlist(nid));
+ kibnal_nid2peerlist(msg->ibm_srcnid));
} else {
- kibnal_put_peer (peer);
+ /* tie-break connection race in favour of the higher NID */
+ if (peer2->ibp_connecting != 0 &&
+ msg->ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
+ write_unlock_irqrestore(&kibnal_data.kib_global_lock,
+ flags);
+ CWARN("Conn race %s\n",
+ libcfs_nid2str(peer2->ibp_nid));
+
+ kibnal_conn_decref(conn);
+ kibnal_peer_decref(peer);
+ return -EALREADY;
+ }
+
+ kibnal_peer_decref(peer);
peer = peer2;
}
/* +1 ref for conn */
- atomic_inc (&peer->ibp_refcount);
- peer->ibp_connecting++;
+ kibnal_peer_addref(peer);
+ peer->ibp_accepting++;
write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
conn->ibc_peer = peer;
conn->ibc_state = IBNAL_CONN_CONNECTING;
conn->ibc_comm_id = cid;
- conn->ibc_incarnation = incarnation;
+ conn->ibc_incarnation = msg->ibm_srcstamp;
conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
+ conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
+ conn->ibc_version = msg->ibm_version;
*connp = conn;
return (0);
}
tTS_IB_CM_CALLBACK_RETURN
-kibnal_idle_conn_callback (tTS_IB_CM_EVENT event,
- tTS_IB_CM_COMM_ID cid,
- void *param,
- void *arg)
+kibnal_bad_conn_callback (tTS_IB_CM_EVENT event,
+ tTS_IB_CM_COMM_ID cid,
+ void *param,
+ void *arg)
{
- /* Shouldn't ever get a callback after TS_IB_CM_IDLE */
CERROR ("Unexpected event %d: conn %p\n", event, arg);
LBUG ();
return TS_IB_CM_CALLBACK_PROCEED;
}
-tTS_IB_CM_CALLBACK_RETURN
-kibnal_conn_callback (tTS_IB_CM_EVENT event,
- tTS_IB_CM_COMM_ID cid,
- void *param,
- void *arg)
+void
+kibnal_abort_txs (kib_conn_t *conn, struct list_head *txs)
{
- kib_conn_t *conn = arg;
LIST_HEAD (zombies);
struct list_head *tmp;
struct list_head *nxt;
kib_tx_t *tx;
unsigned long flags;
- int done;
+
+ spin_lock_irqsave (&conn->ibc_lock, flags);
+
+ list_for_each_safe (tmp, nxt, txs) {
+ tx = list_entry (tmp, kib_tx_t, tx_list);
+
+ if (txs == &conn->ibc_active_txs) {
+ LASSERT (tx->tx_passive_rdma ||
+ !tx->tx_passive_rdma_wait);
+
+ LASSERT (tx->tx_passive_rdma_wait ||
+ tx->tx_sending != 0);
+ } else {
+ LASSERT (!tx->tx_passive_rdma_wait);
+ LASSERT (tx->tx_sending == 0);
+ }
+
+ tx->tx_status = -ECONNABORTED;
+ tx->tx_passive_rdma_wait = 0;
+
+ if (tx->tx_sending == 0) {
+ list_del (&tx->tx_list);
+ list_add (&tx->tx_list, &zombies);
+ }
+ }
+
+ spin_unlock_irqrestore (&conn->ibc_lock, flags);
+
+ kibnal_txlist_done (&zombies, -ECONNABORTED);
+}
+
+tTS_IB_CM_CALLBACK_RETURN
+kibnal_conn_callback (tTS_IB_CM_EVENT event,
+ tTS_IB_CM_COMM_ID cid,
+ void *param,
+ void *arg)
+{
+ kib_conn_t *conn = arg;
int rc;
/* Established Connection Notifier */
switch (event) {
default:
- CERROR("Connection %p -> "LPX64" ERROR %d\n",
- conn, conn->ibc_peer->ibp_nid, event);
+ CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
kibnal_close_conn (conn, -ECONNABORTED);
break;
case TS_IB_CM_DISCONNECTED:
- CDEBUG(D_WARNING, "Connection %p -> "LPX64" DISCONNECTED.\n",
- conn, conn->ibc_peer->ibp_nid);
+ CDEBUG(D_NETERROR, "Connection %p -> %s DISCONNECTED.\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
kibnal_close_conn (conn, 0);
break;
case TS_IB_CM_IDLE:
- CDEBUG(D_NET, "Connection %p -> "LPX64" IDLE.\n",
- conn, conn->ibc_peer->ibp_nid);
- kibnal_put_conn (conn); /* Lose CM's ref */
+ CDEBUG(D_NET, "Connection %p -> %s IDLE.\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
/* LASSERT (no further callbacks) */
- rc = tsIbCmCallbackModify(cid,
- kibnal_idle_conn_callback, conn);
+ rc = tsIbCmCallbackModify(cid, kibnal_bad_conn_callback, conn);
LASSERT (rc == 0);
/* NB we wait until the connection has closed before
* completing outstanding passive RDMAs so we can be sure
* the network can't touch the mapped memory any more. */
- spin_lock_irqsave (&conn->ibc_lock, flags);
-
- /* grab passive RDMAs not waiting for the tx callback */
- list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
- tx = list_entry (tmp, kib_tx_t, tx_list);
-
- LASSERT (tx->tx_passive_rdma ||
- !tx->tx_passive_rdma_wait);
-
- LASSERT (tx->tx_passive_rdma_wait ||
- tx->tx_sending != 0);
-
- /* still waiting for tx callback? */
- if (!tx->tx_passive_rdma_wait)
- continue;
-
- tx->tx_status = -ECONNABORTED;
- tx->tx_passive_rdma_wait = 0;
- done = (tx->tx_sending == 0);
-
- if (!done)
- continue;
-
- list_del (&tx->tx_list);
- list_add (&tx->tx_list, &zombies);
- }
-
- /* grab all blocked transmits */
- list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
- tx = list_entry (tmp, kib_tx_t, tx_list);
-
- list_del (&tx->tx_list);
- list_add (&tx->tx_list, &zombies);
- }
+ kibnal_abort_txs(conn, &conn->ibc_tx_queue);
+ kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
+ kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
+ kibnal_abort_txs(conn, &conn->ibc_active_txs);
- spin_unlock_irqrestore (&conn->ibc_lock, flags);
-
- while (!list_empty(&zombies)) {
- tx = list_entry (zombies.next, kib_tx_t, tx_list);
-
- list_del(&tx->tx_list);
- kibnal_tx_done (tx);
- }
+ kibnal_conn_decref(conn); /* Lose CM's ref */
break;
}
tTS_IB_CM_CALLBACK_RETURN
kibnal_passive_conn_callback (tTS_IB_CM_EVENT event,
- tTS_IB_CM_COMM_ID cid,
- void *param,
- void *arg)
+ tTS_IB_CM_COMM_ID cid,
+ void *param,
+ void *arg)
{
- kib_conn_t *conn = arg;
+ kib_conn_t *conn = arg;
int rc;
switch (event) {
return TS_IB_CM_CALLBACK_ABORT;
}
- CERROR ("Unexpected event %p -> "LPX64": %d\n",
- conn, conn->ibc_peer->ibp_nid, event);
- kibnal_connreq_done (conn, 0, -ECONNABORTED);
- break;
+ CERROR ("%s event %p -> %s: %d\n",
+ (event == TS_IB_CM_IDLE) ? "IDLE" : "Unexpected",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
+ kibnal_connreq_done(conn, 0, -ECONNABORTED);
+ kibnal_conn_decref(conn); /* drop CM's ref */
+ return TS_IB_CM_CALLBACK_ABORT;
case TS_IB_CM_REQ_RECEIVED: {
struct ib_cm_req_received_param *req = param;
- kib_wire_connreq_t *wcr = req->remote_private_data;
+ kib_msg_t *msg = req->remote_private_data;
LASSERT (conn == NULL);
- CDEBUG(D_NET, "REQ from "LPX64"\n", le64_to_cpu(wcr->wcr_nid));
+ /* Don't really know srcnid until successful unpack */
+ CDEBUG(D_NET, "REQ from ?%s?\n", libcfs_nid2str(msg->ibm_srcnid));
- if (req->remote_private_data_len < sizeof (*wcr)) {
- CERROR("Connect from remote LID %04x: too short %d\n",
- req->dlid, req->remote_private_data_len);
- return TS_IB_CM_CALLBACK_ABORT;
- }
-
- if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
- CERROR ("Can't accept LID %04x: bad magic %08x\n",
- req->dlid, le32_to_cpu(wcr->wcr_magic));
- return TS_IB_CM_CALLBACK_ABORT;
- }
-
- if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
- CERROR ("Can't accept LID %04x: bad version %d\n",
- req->dlid, le16_to_cpu(wcr->wcr_magic));
- return TS_IB_CM_CALLBACK_ABORT;
- }
-
- rc = kibnal_accept(&conn,
- cid,
- le64_to_cpu(wcr->wcr_nid),
- le64_to_cpu(wcr->wcr_incarnation),
- le16_to_cpu(wcr->wcr_queue_depth));
+ rc = kibnal_accept_connreq(&conn, cid, msg,
+ req->remote_private_data_len);
if (rc != 0) {
- CERROR ("Can't accept "LPX64": %d\n",
- le64_to_cpu(wcr->wcr_nid), rc);
+ CERROR ("Can't accept ?%s?: %d\n",
+ libcfs_nid2str(msg->ibm_srcnid), rc);
return TS_IB_CM_CALLBACK_ABORT;
}
/* update 'arg' for next callback */
- rc = tsIbCmCallbackModify(cid,
- kibnal_passive_conn_callback, conn);
+ rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn);
LASSERT (rc == 0);
+ msg = req->accept_param.reply_private_data;
+ kibnal_init_msg(msg, IBNAL_MSG_CONNACK,
+ sizeof(msg->ibm_u.connparams));
+
+ msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
+
+ kibnal_pack_msg(msg, conn->ibc_version, 0,
+ conn->ibc_peer->ibp_nid,
+ conn->ibc_incarnation);
+
req->accept_param.qp = conn->ibc_qp;
- *((kib_wire_connreq_t *)req->accept_param.reply_private_data)
- = (kib_wire_connreq_t) {
- .wcr_magic = cpu_to_le32(IBNAL_MSG_MAGIC),
- .wcr_version = cpu_to_le16(IBNAL_MSG_VERSION),
- .wcr_queue_depth = cpu_to_le32(IBNAL_MSG_QUEUE_SIZE),
- .wcr_nid = cpu_to_le64(kibnal_data.kib_nid),
- .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
- };
- req->accept_param.reply_private_data_len = sizeof(kib_wire_connreq_t);
+ req->accept_param.reply_private_data_len = msg->ibm_nob;
req->accept_param.responder_resources = IBNAL_RESPONDER_RESOURCES;
req->accept_param.initiator_depth = IBNAL_RESPONDER_RESOURCES;
req->accept_param.rnr_retry_count = IBNAL_RNR_RETRY;
req->accept_param.flow_control = IBNAL_FLOW_CONTROL;
CDEBUG(D_NET, "Proceeding\n");
- break;
+ return TS_IB_CM_CALLBACK_PROCEED; /* CM takes my ref on conn */
}
case TS_IB_CM_ESTABLISHED:
LASSERT (conn != NULL);
- CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
- conn, conn->ibc_peer->ibp_nid);
+ CWARN("Connection %p -> %s ESTABLISHED.\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
- kibnal_connreq_done (conn, 0, 0);
- break;
+ kibnal_connreq_done(conn, 0, 0);
+ return TS_IB_CM_CALLBACK_PROCEED;
}
-
- /* NB if the connreq is done, we switch to kibnal_conn_callback */
- return TS_IB_CM_CALLBACK_PROCEED;
}
tTS_IB_CM_CALLBACK_RETURN
kibnal_active_conn_callback (tTS_IB_CM_EVENT event,
- tTS_IB_CM_COMM_ID cid,
- void *param,
- void *arg)
+ tTS_IB_CM_COMM_ID cid,
+ void *param,
+ void *arg)
{
- kib_conn_t *conn = arg;
+ kib_conn_t *conn = arg;
+ unsigned long flags;
switch (event) {
case TS_IB_CM_REP_RECEIVED: {
struct ib_cm_rep_received_param *rep = param;
- kib_wire_connreq_t *wcr = rep->remote_private_data;
+ kib_msg_t *msg = rep->remote_private_data;
+ int nob = rep->remote_private_data_len;
+ int rc;
- if (rep->remote_private_data_len < sizeof (*wcr)) {
- CERROR ("Short reply from "LPX64": %d\n",
- conn->ibc_peer->ibp_nid,
- rep->remote_private_data_len);
- kibnal_connreq_done (conn, 1, -EPROTO);
- break;
+ rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
+ if (rc != 0) {
+ CERROR ("Error %d unpacking conn ack from %s\n",
+ rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
+ kibnal_connreq_done(conn, 1, rc);
+ kibnal_conn_decref(conn); /* drop CM's ref */
+ return TS_IB_CM_CALLBACK_ABORT;
}
- if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
- CERROR ("Can't connect "LPX64": bad magic %08x\n",
- conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic));
- kibnal_connreq_done (conn, 1, -EPROTO);
- break;
+ if (msg->ibm_type != IBNAL_MSG_CONNACK) {
+ CERROR ("Unexpected conn ack type %d from %s\n",
+ msg->ibm_type,
+ libcfs_nid2str(conn->ibc_peer->ibp_nid));
+ kibnal_connreq_done(conn, 1, -EPROTO);
+ kibnal_conn_decref(conn); /* drop CM's ref */
+ return TS_IB_CM_CALLBACK_ABORT;
}
-
- if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
- CERROR ("Can't connect "LPX64": bad version %d\n",
- conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic));
- kibnal_connreq_done (conn, 1, -EPROTO);
- break;
+
+ if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
+ msg->ibm_srcnid) ||
+ !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
+ msg->ibm_dstnid) ||
+ msg->ibm_srcstamp != conn->ibc_incarnation ||
+ msg->ibm_dststamp != kibnal_data.kib_incarnation) {
+ CERROR("Stale conn ack from %s\n",
+ libcfs_nid2str(conn->ibc_peer->ibp_nid));
+ kibnal_connreq_done(conn, 1, -ESTALE);
+ kibnal_conn_decref(conn); /* drop CM's ref */
+ return TS_IB_CM_CALLBACK_ABORT;
}
-
- if (wcr->wcr_queue_depth != cpu_to_le16(IBNAL_MSG_QUEUE_SIZE)) {
- CERROR ("Can't connect "LPX64": bad queue depth %d\n",
- conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_queue_depth));
- kibnal_connreq_done (conn, 1, -EPROTO);
- break;
+
+ if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
+ CERROR ("Bad queue depth %d from %s\n",
+ msg->ibm_u.connparams.ibcp_queue_depth,
+ libcfs_nid2str(conn->ibc_peer->ibp_nid));
+ kibnal_connreq_done(conn, 1, -EPROTO);
+ kibnal_conn_decref(conn); /* drop CM's ref */
+ return TS_IB_CM_CALLBACK_ABORT;
}
- if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) {
- CERROR ("Unexpected NID "LPX64" from "LPX64"\n",
- le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid);
- kibnal_connreq_done (conn, 1, -EPROTO);
- break;
- }
-
- CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
- conn, conn->ibc_peer->ibp_nid);
+ CDEBUG(D_NET, "Connection %p -> %s REP_RECEIVED.\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
- conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation);
conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
- break;
+ conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
+ return TS_IB_CM_CALLBACK_PROCEED;
}
case TS_IB_CM_ESTABLISHED:
- CDEBUG(D_WARNING, "Connection %p -> "LPX64" Established\n",
- conn, conn->ibc_peer->ibp_nid);
+ CWARN("Connection %p -> %s ESTABLISHED\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
- kibnal_connreq_done (conn, 1, 0);
- break;
+ kibnal_connreq_done(conn, 1, 0);
+ return TS_IB_CM_CALLBACK_PROCEED;
case TS_IB_CM_IDLE:
- CERROR("Connection %p -> "LPX64" IDLE\n",
- conn, conn->ibc_peer->ibp_nid);
- /* Back out state change: I'm disengaged from CM */
- conn->ibc_state = IBNAL_CONN_INIT_QP;
-
- kibnal_connreq_done (conn, 1, -ECONNABORTED);
- break;
+ CDEBUG(D_NETERROR, "Connection %p -> %s IDLE\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
+ /* I assume this connection attempt was rejected because the
+ * peer found a stale QP; I'll just try again */
+ write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
+ kibnal_schedule_active_connect_locked(conn->ibc_peer);
+ write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
+
+ kibnal_connreq_done(conn, 1, -ECONNABORTED);
+ kibnal_conn_decref(conn); /* drop CM's ref */
+ return TS_IB_CM_CALLBACK_ABORT;
default:
- CERROR("Connection %p -> "LPX64" ERROR %d\n",
- conn, conn->ibc_peer->ibp_nid, event);
- kibnal_connreq_done (conn, 1, -ECONNABORTED);
- break;
+ CDEBUG(D_NETERROR, "Connection %p -> %s ERROR %d\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), event);
+ kibnal_connreq_done(conn, 1, -ECONNABORTED);
+ kibnal_conn_decref(conn); /* drop CM's ref */
+ return TS_IB_CM_CALLBACK_ABORT;
}
-
- /* NB if the connreq is done, we switch to kibnal_conn_callback */
- return TS_IB_CM_CALLBACK_PROCEED;
}
int
void *arg)
{
kib_conn_t *conn = arg;
-
+ kib_peer_t *peer = conn->ibc_peer;
+ kib_msg_t *msg = &conn->ibc_connreq->cr_msg;
+
if (status != 0) {
- CERROR ("status %d\n", status);
- kibnal_connreq_done (conn, 1, status);
- goto out;
+ CDEBUG (D_NETERROR, "Pathreq %p -> %s failed: %d\n",
+ conn, libcfs_nid2str(peer->ibp_nid), status);
+ kibnal_connreq_done(conn, 1, status);
+ kibnal_conn_decref(conn); /* drop callback's ref */
+ return 1; /* non-zero prevents further callbacks */
}
conn->ibc_connreq->cr_path = *resp;
- conn->ibc_connreq->cr_wcr = (kib_wire_connreq_t) {
- .wcr_magic = cpu_to_le32(IBNAL_MSG_MAGIC),
- .wcr_version = cpu_to_le16(IBNAL_MSG_VERSION),
- .wcr_queue_depth = cpu_to_le16(IBNAL_MSG_QUEUE_SIZE),
- .wcr_nid = cpu_to_le64(kibnal_data.kib_nid),
- .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
- };
+ kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
+ msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
+ kibnal_pack_msg(msg, conn->ibc_version, 0,
+ peer->ibp_nid, conn->ibc_incarnation);
conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) {
.qp = conn->ibc_qp,
- .req_private_data = &conn->ibc_connreq->cr_wcr,
- .req_private_data_len = sizeof(conn->ibc_connreq->cr_wcr),
+ .req_private_data = msg,
+ .req_private_data_len = msg->ibm_nob,
.responder_resources = IBNAL_RESPONDER_RESOURCES,
.initiator_depth = IBNAL_RESPONDER_RESOURCES,
.retry_count = IBNAL_RETRY,
.rnr_retry_count = IBNAL_RNR_RETRY,
- .cm_response_timeout = kibnal_tunables.kib_io_timeout,
+ .cm_response_timeout = *kibnal_tunables.kib_timeout,
.max_cm_retries = IBNAL_CM_RETRY,
.flow_control = IBNAL_FLOW_CONTROL,
};
/* Flag I'm getting involved with the CM... */
conn->ibc_state = IBNAL_CONN_CONNECTING;
- CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
- conn->ibc_connreq->cr_service.service_id,
- *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
+ CDEBUG(D_NET, "Connecting to, service id "LPX64", on %s\n",
+ conn->ibc_connreq->cr_svcrsp.ibsr_svc_id,
+ libcfs_nid2str(peer->ibp_nid));
/* kibnal_connect_callback gets my conn ref */
status = ib_cm_connect (&conn->ibc_connreq->cr_connparam,
&conn->ibc_connreq->cr_path, NULL,
- conn->ibc_connreq->cr_service.service_id, 0,
+ conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0,
kibnal_active_conn_callback, conn,
&conn->ibc_comm_id);
if (status != 0) {
- CERROR ("Connect: %d\n", status);
+ CERROR ("Connect %p -> %s failed: %d\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
/* Back out state change: I've not got a CM comm_id yet... */
conn->ibc_state = IBNAL_CONN_INIT_QP;
- kibnal_connreq_done (conn, 1, status);
+ kibnal_connreq_done(conn, 1, status);
+ kibnal_conn_decref(conn); /* Drop callback's ref */
}
- out:
- /* return non-zero to prevent further callbacks */
- return 1;
-}
-
-void
-kibnal_service_get_callback (tTS_IB_CLIENT_QUERY_TID tid, int status,
- struct ib_common_attrib_service *resp, void *arg)
-{
- kib_conn_t *conn = arg;
-
- if (status != 0) {
- CERROR ("status %d\n", status);
- kibnal_connreq_done (conn, 1, status);
- return;
- }
-
- CDEBUG(D_NET, "Got status %d, service id "LPX64", on "LPX64"\n",
- status, resp->service_id,
- *kibnal_service_nid_field(resp));
-
- conn->ibc_connreq->cr_service = *resp;
-
- status = ib_cached_gid_get(kibnal_data.kib_device,
- kibnal_data.kib_port, 0,
- conn->ibc_connreq->cr_gid);
- LASSERT (status == 0);
-
- /* kibnal_pathreq_callback gets my conn ref */
- status = tsIbPathRecordRequest (kibnal_data.kib_device,
- kibnal_data.kib_port,
- conn->ibc_connreq->cr_gid,
- conn->ibc_connreq->cr_service.service_gid,
- conn->ibc_connreq->cr_service.service_pkey,
- 0,
- kibnal_tunables.kib_io_timeout * HZ,
- 0,
- kibnal_pathreq_callback, conn,
- &conn->ibc_connreq->cr_tid);
-
- if (status == 0)
- return;
-
- CERROR ("Path record request: %d\n", status);
- kibnal_connreq_done (conn, 1, status);
+ return 1; /* non-zero to prevent further callbacks */
}
void
kibnal_connect_peer (kib_peer_t *peer)
{
- kib_conn_t *conn = kibnal_create_conn();
+ kib_conn_t *conn;
int rc;
- LASSERT (peer->ibp_connecting != 0);
-
+ conn = kibnal_create_conn();
if (conn == NULL) {
CERROR ("Can't allocate conn\n");
kibnal_peer_connect_failed (peer, 1, -ENOMEM);
}
conn->ibc_peer = peer;
- atomic_inc (&peer->ibp_refcount);
+ kibnal_peer_addref(peer);
- PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
+ LIBCFS_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
if (conn->ibc_connreq == NULL) {
CERROR ("Can't allocate connreq\n");
- kibnal_connreq_done (conn, 1, -ENOMEM);
+ kibnal_connreq_done(conn, 1, -ENOMEM);
+ kibnal_conn_decref(conn); /* drop my ref */
return;
}
memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
- kibnal_set_service_keys(&conn->ibc_connreq->cr_service, peer->ibp_nid);
+ rc = kibnal_make_svcqry(conn);
+ if (rc != 0) {
+ kibnal_connreq_done (conn, 1, rc);
+ kibnal_conn_decref(conn); /* drop my ref */
+ return;
+ }
- /* kibnal_service_get_callback gets my conn ref */
- rc = ib_service_get (kibnal_data.kib_device,
- kibnal_data.kib_port,
- &conn->ibc_connreq->cr_service,
- KIBNAL_SERVICE_KEY_MASK,
- kibnal_tunables.kib_io_timeout * HZ,
- kibnal_service_get_callback, conn,
- &conn->ibc_connreq->cr_tid);
-
+ rc = ib_cached_gid_get(kibnal_data.kib_device,
+ kibnal_data.kib_port, 0,
+ conn->ibc_connreq->cr_gid);
+ LASSERT (rc == 0);
+
+ /* kibnal_pathreq_callback gets my conn ref */
+ rc = tsIbPathRecordRequest (kibnal_data.kib_device,
+ kibnal_data.kib_port,
+ conn->ibc_connreq->cr_gid,
+ conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid,
+ conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey,
+ 0,
+ *kibnal_tunables.kib_timeout * HZ,
+ 0,
+ kibnal_pathreq_callback, conn,
+ &conn->ibc_connreq->cr_tid);
if (rc == 0)
- return;
+ return; /* callback now has my ref on conn */
- CERROR ("ib_service_get: %d\n", rc);
- kibnal_connreq_done (conn, 1, rc);
+ CERROR ("Path record request %p -> %s failed: %d\n",
+ conn, libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
+ kibnal_connreq_done(conn, 1, rc);
+ kibnal_conn_decref(conn); /* drop my ref */
}
int
-kibnal_conn_timed_out (kib_conn_t *conn)
+kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
{
kib_tx_t *tx;
struct list_head *ttmp;
unsigned long flags;
+ int timed_out = 0;
spin_lock_irqsave (&conn->ibc_lock, flags);
- list_for_each (ttmp, &conn->ibc_tx_queue) {
+ list_for_each (ttmp, txs) {
tx = list_entry (ttmp, kib_tx_t, tx_list);
- LASSERT (!tx->tx_passive_rdma_wait);
- LASSERT (tx->tx_sending == 0);
+ if (txs == &conn->ibc_active_txs) {
+ LASSERT (tx->tx_passive_rdma ||
+ !tx->tx_passive_rdma_wait);
- if (time_after_eq (jiffies, tx->tx_deadline)) {
- spin_unlock_irqrestore (&conn->ibc_lock, flags);
- return 1;
+ LASSERT (tx->tx_passive_rdma_wait ||
+ tx->tx_sending != 0);
+ } else {
+ LASSERT (!tx->tx_passive_rdma_wait);
+ LASSERT (tx->tx_sending == 0);
}
- }
-
- list_for_each (ttmp, &conn->ibc_active_txs) {
- tx = list_entry (ttmp, kib_tx_t, tx_list);
-
- LASSERT (tx->tx_passive_rdma ||
- !tx->tx_passive_rdma_wait);
-
- LASSERT (tx->tx_passive_rdma_wait ||
- tx->tx_sending != 0);
-
+
if (time_after_eq (jiffies, tx->tx_deadline)) {
- spin_unlock_irqrestore (&conn->ibc_lock, flags);
- return 1;
+ timed_out = 1;
+ break;
}
}
spin_unlock_irqrestore (&conn->ibc_lock, flags);
+ return timed_out;
+}
- return 0;
+int
+kibnal_conn_timed_out (kib_conn_t *conn)
+{
+ return kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
+ kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
+ kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
+ kibnal_check_txs(conn, &conn->ibc_active_txs);
}
void
kib_peer_t *peer;
kib_conn_t *conn;
struct list_head *ctmp;
+ unsigned long flags;
again:
/* NB. We expect to have a look at all the peers and not find any
* rdmas to time out, so we just use a shared lock while we
* take a look... */
- read_lock (&kibnal_data.kib_global_lock);
+ read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
list_for_each (ptmp, peers) {
peer = list_entry (ptmp, kib_peer_t, ibp_list);
if (!kibnal_conn_timed_out(conn))
continue;
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- conn, conn->ibc_state, peer->ibp_nid,
- atomic_read (&conn->ibc_refcount));
+ kibnal_conn_addref(conn);
- atomic_inc (&conn->ibc_refcount);
- read_unlock (&kibnal_data.kib_global_lock);
+ read_unlock_irqrestore(&kibnal_data.kib_global_lock,
+ flags);
- CERROR("Timed out RDMA with "LPX64"\n",
- peer->ibp_nid);
+ CERROR("Timed out RDMA with %s\n",
+ libcfs_nid2str(peer->ibp_nid));
kibnal_close_conn (conn, -ETIMEDOUT);
- kibnal_put_conn (conn);
+ kibnal_conn_decref(conn);
/* start again now I've dropped the lock */
goto again;
}
}
- read_unlock (&kibnal_data.kib_global_lock);
+ read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
}
void
rc = ib_cm_disconnect (conn->ibc_comm_id);
if (rc != 0)
- CERROR ("Error %d disconnecting conn %p -> "LPX64"\n",
- rc, conn, conn->ibc_peer->ibp_nid);
+ CERROR ("Error %d disconnecting conn %p -> %s\n",
+ rc, conn, libcfs_nid2str(conn->ibc_peer->ibp_nid));
+
+ kibnal_peer_notify(conn->ibc_peer);
}
int
-kibnal_connd (void *arg)
+kibnal_reaper (void *arg)
{
wait_queue_t wait;
unsigned long flags;
kib_conn_t *conn;
- kib_peer_t *peer;
int timeout;
int i;
int peer_index = 0;
unsigned long deadline = jiffies;
- kportal_daemonize ("kibnal_connd");
- kportal_blockallsigs ();
+ cfs_daemonize ("kibnal_reaper");
+ cfs_block_allsigs ();
init_waitqueue_entry (&wait, current);
- spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
+ spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
- for (;;) {
- if (!list_empty (&kibnal_data.kib_connd_conns)) {
- conn = list_entry (kibnal_data.kib_connd_conns.next,
+ while (!kibnal_data.kib_shutdown) {
+ if (!list_empty (&kibnal_data.kib_reaper_conns)) {
+ conn = list_entry (kibnal_data.kib_reaper_conns.next,
kib_conn_t, ibc_list);
list_del (&conn->ibc_list);
- spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
+ spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
switch (conn->ibc_state) {
case IBNAL_CONN_DEATHROW:
* callback and last ref reschedules it
* here... */
kibnal_terminate_conn(conn);
- kibnal_put_conn (conn);
+ kibnal_conn_decref(conn);
break;
-
+
+ case IBNAL_CONN_INIT_QP:
case IBNAL_CONN_ZOMBIE:
kibnal_destroy_conn (conn);
break;
LBUG();
}
- spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
+ spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
continue;
}
- if (!list_empty (&kibnal_data.kib_connd_peers)) {
- peer = list_entry (kibnal_data.kib_connd_peers.next,
- kib_peer_t, ibp_connd_list);
-
- list_del_init (&peer->ibp_connd_list);
- spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
-
- kibnal_connect_peer (peer);
- kibnal_put_peer (peer);
-
- spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
- }
-
- /* shut down and nobody left to reap... */
- if (kibnal_data.kib_shutdown &&
- atomic_read(&kibnal_data.kib_nconns) == 0)
- break;
-
- spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
+ spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
/* careful with the jiffy wrap... */
while ((timeout = (int)(deadline - jiffies)) <= 0) {
* connection within (n+1)/n times the timeout
* interval. */
- if (kibnal_tunables.kib_io_timeout > n * p)
+ if (*kibnal_tunables.kib_timeout > n * p)
chunk = (chunk * n * p) /
- kibnal_tunables.kib_io_timeout;
+ *kibnal_tunables.kib_timeout;
if (chunk == 0)
chunk = 1;
deadline += p * HZ;
}
- kibnal_data.kib_connd_waketime = jiffies + timeout;
+ kibnal_data.kib_reaper_waketime = jiffies + timeout;
+
+ set_current_state (TASK_INTERRUPTIBLE);
+ add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
+
+ schedule_timeout (timeout);
+
+ set_current_state (TASK_RUNNING);
+ remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait);
+
+ spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
+ }
+
+ spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
+
+ kibnal_thread_fini ();
+ return (0);
+}
+
+int
+kibnal_connd (void *arg)
+{
+ long id = (long)arg;
+ char name[16];
+ wait_queue_t wait;
+ unsigned long flags;
+ kib_peer_t *peer;
+ kib_acceptsock_t *as;
+ int did_something;
+
+ snprintf(name, sizeof(name), "kibnal_connd_%02ld", id);
+ cfs_daemonize(name);
+ cfs_block_allsigs();
+
+ init_waitqueue_entry (&wait, current);
+
+ spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
+
+ while (!kibnal_data.kib_shutdown) {
+ did_something = 0;
+
+ if (!list_empty (&kibnal_data.kib_connd_acceptq)) {
+ as = list_entry (kibnal_data.kib_connd_acceptq.next,
+ kib_acceptsock_t, ibas_list);
+ list_del (&as->ibas_list);
+
+ spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
+
+ kibnal_handle_svcqry(as->ibas_sock);
+ kibnal_free_acceptsock(as);
+
+ spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
+ did_something = 1;
+ }
+
+ /* Only handle an outgoing connection request if there is someone left
+ * to handle an incoming svcqry */
+ if (!list_empty (&kibnal_data.kib_connd_peers) &&
+ ((kibnal_data.kib_connd_connecting + 1) <
+ *kibnal_tunables.kib_n_connd)) {
+ peer = list_entry (kibnal_data.kib_connd_peers.next,
+ kib_peer_t, ibp_connd_list);
+
+ list_del_init (&peer->ibp_connd_list);
+ kibnal_data.kib_connd_connecting++;
+ spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
+
+ kibnal_connect_peer (peer);
+ kibnal_peer_decref(peer);
+
+ spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
+ did_something = 1;
+ kibnal_data.kib_connd_connecting--;
+ }
+
+ if (did_something)
+ continue;
set_current_state (TASK_INTERRUPTIBLE);
- add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
+ add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
- if (!kibnal_data.kib_shutdown &&
- list_empty (&kibnal_data.kib_connd_conns) &&
- list_empty (&kibnal_data.kib_connd_peers))
- schedule_timeout (timeout);
+ spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
+
+ schedule();
set_current_state (TASK_RUNNING);
remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
int did_something;
snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
- kportal_daemonize(name);
- kportal_blockallsigs();
+ cfs_daemonize(name);
+ cfs_block_allsigs();
spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
- for (;;) {
+ while (!kibnal_data.kib_shutdown) {
did_something = 0;
while (!list_empty(&kibnal_data.kib_sched_txq)) {
flags);
}
- /* shut down and no receives to complete... */
- if (kibnal_data.kib_shutdown &&
- atomic_read(&kibnal_data.kib_nconns) == 0)
- break;
-
/* nothing to do or hogging CPU */
if (!did_something || counter++ == IBNAL_RESCHED) {
spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
counter = 0;
if (!did_something) {
- rc = wait_event_interruptible(
+ rc = wait_event_interruptible_exclusive(
kibnal_data.kib_sched_waitq,
!list_empty(&kibnal_data.kib_sched_txq) ||
!list_empty(&kibnal_data.kib_sched_rxq) ||
- (kibnal_data.kib_shutdown &&
- atomic_read (&kibnal_data.kib_nconns) == 0));
+ kibnal_data.kib_shutdown);
} else {
our_cond_resched();
}
kibnal_thread_fini();
return (0);
}
-
-
-lib_nal_t kibnal_lib = {
- libnal_data: &kibnal_data, /* NAL private data */
- libnal_send: kibnal_send,
- libnal_send_pages: kibnal_send_pages,
- libnal_recv: kibnal_recv,
- libnal_recv_pages: kibnal_recv_pages,
- libnal_dist: kibnal_dist
-};