From 70eeae26fa3ce8382b6862281cb4342bca4d3515 Mon Sep 17 00:00:00 2001 From: eeb Date: Fri, 17 Dec 2004 14:35:56 +0000 Subject: [PATCH] * Added ranal subdir --- lnet/klnds/ralnd/.cvsignore | 10 + lnet/klnds/ralnd/Makefile.in | 6 + lnet/klnds/ralnd/autoMakefile.am | 15 + lnet/klnds/ralnd/ralnd.c | 1978 ++++++++++++++++++++++++++++ lnet/klnds/ralnd/ralnd.h | 438 ++++++ lnet/klnds/ralnd/ralnd_cb.c | 1754 ++++++++++++++++++++++++ lustre/portals/knals/ranal/.cvsignore | 10 + lustre/portals/knals/ranal/Makefile.in | 6 + lustre/portals/knals/ranal/autoMakefile.am | 15 + lustre/portals/knals/ranal/ranal.c | 1978 ++++++++++++++++++++++++++++ lustre/portals/knals/ranal/ranal.h | 438 ++++++ lustre/portals/knals/ranal/ranal_cb.c | 1754 ++++++++++++++++++++++++ 12 files changed, 8402 insertions(+) create mode 100644 lnet/klnds/ralnd/.cvsignore create mode 100644 lnet/klnds/ralnd/Makefile.in create mode 100644 lnet/klnds/ralnd/autoMakefile.am create mode 100644 lnet/klnds/ralnd/ralnd.c create mode 100644 lnet/klnds/ralnd/ralnd.h create mode 100644 lnet/klnds/ralnd/ralnd_cb.c create mode 100644 lustre/portals/knals/ranal/.cvsignore create mode 100644 lustre/portals/knals/ranal/Makefile.in create mode 100644 lustre/portals/knals/ranal/autoMakefile.am create mode 100644 lustre/portals/knals/ranal/ranal.c create mode 100644 lustre/portals/knals/ranal/ranal.h create mode 100644 lustre/portals/knals/ranal/ranal_cb.c diff --git a/lnet/klnds/ralnd/.cvsignore b/lnet/klnds/ralnd/.cvsignore new file mode 100644 index 0000000..5ed596b --- /dev/null +++ b/lnet/klnds/ralnd/.cvsignore @@ -0,0 +1,10 @@ +.deps +Makefile +.*.cmd +autoMakefile.in +autoMakefile +*.ko +*.mod.c +.*.flags +.tmp_versions +.depend diff --git a/lnet/klnds/ralnd/Makefile.in b/lnet/klnds/ralnd/Makefile.in new file mode 100644 index 0000000..1772cc2 --- /dev/null +++ b/lnet/klnds/ralnd/Makefile.in @@ -0,0 +1,6 @@ +MODULES := kranal +kranal-objs := ranal.o ranal_cb.o + +EXTRA_POST_CFLAGS := @RACPPFLAGS@ + +@INCLUDE_RULES@ diff --git a/lnet/klnds/ralnd/autoMakefile.am b/lnet/klnds/ralnd/autoMakefile.am new file mode 100644 index 0000000..f136aa5 --- /dev/null +++ b/lnet/klnds/ralnd/autoMakefile.am @@ -0,0 +1,15 @@ +# Copyright (C) 2001 Cluster File Systems, Inc. +# +# This code is issued under the GNU General Public License. +# See the file COPYING in this distribution + +if MODULES +if !CRAY_PORTALS +if BUILD_RANAL +modulenet_DATA = kranal$(KMODEXT) +endif +endif +endif + +MOSTLYCLEANFILES = *.o *.ko *.mod.c +DIST_SOURCES = $(kranal-objs:%.o=%.c) ranal.h diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c new file mode 100644 index 0000000..a59757d --- /dev/null +++ b/lnet/klnds/ralnd/ralnd.c @@ -0,0 +1,1978 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2004 Cluster File Systems, Inc. + * Author: Eric Barton + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ +#include "ranal.h" + + +nal_t kranal_api; +ptl_handle_ni_t kranal_ni; +kra_data_t kranal_data; +kra_tunables_t kranal_tunables; + +#ifdef CONFIG_SYSCTL +#define RANAL_SYSCTL_TIMEOUT 1 +#define RANAL_SYSCTL_LISTENER_TIMEOUT 2 +#define RANAL_SYSCTL_BACKLOG 3 +#define RANAL_SYSCTL_PORT 4 +#define RANAL_SYSCTL_MAX_IMMEDIATE 5 + +#define RANAL_SYSCTL 202 + +static ctl_table kranal_ctl_table[] = { + {RANAL_SYSCTL_TIMEOUT, "timeout", + &kranal_tunables.kra_timeout, sizeof(int), + 0644, NULL, &proc_dointvec}, + {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", + &kranal_tunables.kra_listener_timeout, sizeof(int), + 0644, NULL, &proc_dointvec}, + {RANAL_SYSCTL_BACKLOG, "backlog", + &kranal_tunables.kra_backlog, sizeof(int), + 0644, NULL, kranal_listener_procint}, + {RANAL_SYSCTL_PORT, "port", + &kranal_tunables.kra_port, sizeof(int), + 0644, NULL, kranal_listener_procint}, + {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", + &kranal_tunables.kra_max_immediate, sizeof(int), + 0644, NULL, &proc_dointvec}, + { 0 } +}; + +static ctl_table kranal_top_ctl_table[] = { + {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table}, + { 0 } +}; +#endif + +int +kranal_sock_write (struct socket *sock, void *buffer, int nob) +{ + int rc; + mm_segment_t oldmm = get_fs(); + struct iovec iov = { + .iov_base = buffer, + .iov_len = nob + }; + struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = MSG_DONTWAIT + }; + + /* We've set up the socket's send buffer to be large enough for + * everything we send, so a single non-blocking send should + * complete without error. */ + + set_fs(KERNEL_DS); + rc = sock_sendmsg(sock, &msg, iov.iov_len); + set_fs(oldmm); + + return rc; +} + +int +kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) +{ + int rc; + mm_segment_t oldmm = get_fs(); + long ticks = timeout * HZ; + unsigned long then; + struct timeval tv; + + LASSERT (nob > 0); + LASSERT (ticks > 0); + + for (;;) { + struct iovec iov = { + .iov_base = buffer, + .iov_len = nob + }; + struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = 0 + }; + + /* Set receive timeout to remaining time */ + tv = (struct timeval) { + .tv_sec = ticks / HZ, + .tv_usec = ((ticks % HZ) * 1000000) / HZ; + }; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, + (char *)&tv, sizeof(tv)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set socket recv timeout %d: %d\n", + send_timeout, rc); + return rc; + } + + set_fs(KERNEL_DS); + then = jiffies; + rc = sock_recvmsg(sock, &msg, iov.iov_len, 0); + ticks -= jiffies - then; + set_fs(oldmm); + + if (rc < 0) + return rc; + + if (rc == 0) + return -ECONNABORTED; + + buffer = ((char *)buffer) + rc; + nob -= rc; + + if (nob == 0) + return 0; + + if (ticks <= 0) + return -ETIMEDOUT; + } +} + +int +kranal_create_sock(struct socket **sockp) +{ + struct socket *sock; + int rc; + struct timeval tv; + int option; + mm_segment_t oldmm = get_fs(); + + rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock); + if (rc != 0) { + CERROR("Can't create socket: %d\n", rc); + return rc; + } + + /* Ensure sending connection info doesn't block */ + option = 2 * sizeof(kra_connreq_t); + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set send buffer %d: %d\n", option, rc); + goto failed; + } + + option = 1; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set SO_REUSEADDR: %d\n", rc); + goto failed; + } + + *sockp = sock; + return 0; + + failed: + sock_release(sock); + return rc; +} + +void +kranal_pause(int ticks) +{ + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(ticks); +} + +void +kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) +{ + memset(connreq, 0, sizeof(*connreq)); + + connreq->racr_magic = RANAL_MSG_MAGIC; + connreq->racr_version = RANAL_MSG_VERSION; + connreq->racr_devid = conn->rac_device->rad_id; + connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_timeout = conn->rac_timeout; + connreq->racr_incarnation = conn->rac_my_incarnation; + + rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams); + LASSERT(rrc == RAP_SUCCESS); +} + +int +kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout) +{ + int i; + int rc; + + rc = kranal_sock_read(newsock, connreq, sizeof(*connreq), timeout); + if (rc != 0) { + CERROR("Read failed: %d\n", rc); + return rc; + } + + if (connreq->racr_magic != RANAL_MSG_MAGIC) { + if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) { + CERROR("Unexpected magic %08x\n", connreq->racr_magic); + return -EPROTO; + } + + __swab32s(&connreq->racr_magic); + __swab16s(&connreq->racr_version); + __swab16s(&connreq->racr_devid); + __swab64s(&connreq->racr_nid); + __swab64s(&connreq->racr_incarnation); + __swab32s(&connreq->racr_timeout); + + __swab32s(&connreq->racr_riparams.FmaDomainHndl); + __swab32s(&connreq->racr_riparams.RcvCqHndl); + __swab32s(&connreq->racr_riparams.PTag); + __swab32s(&connreq->racr_riparams.CompletionCookie); + } + + if (connreq->racr_version != RANAL_MSG_VERSION) { + CERROR("Unexpected version %d\n", connreq->racr_version); + return -EPROTO; + } + + if (connreq->racr_nid == PTL_NID_ANY) { + CERROR("Received PTL_NID_ANY\n"); + return -EPROTO; + } + + if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) { + CERROR("Received timeout %d < MIN %d\n", + connreq->racr_timeout, RANAL_MIN_TIMEOUT); + return -EPROTO; + } + + for (i = 0; i < kranal_data.kra_ndevs; i++) + if (connreq->racr_devid == + kranal_data.kra_devices[i]->rad_id) + break; + + if (i == kranal_data.kra_ndevs) { + CERROR("Can't match device %d\n", connreq->racr_devid); + return -ENODEV; + } + + return 0; +} + +int +kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation) +{ + kra_conn_t *conn; + struct list_head *tmp; + int loopback = 0; + + list_for_each(tmp, &peer->rap_conns) { + conn = list_entry(tmp, kra_conn_t, rac_list); + + if (conn->rac_incarnation < incarnation) { + /* Conns with an older incarnation get culled later */ + continue; + } + + if (!loopback && + conn->rac_incarnation == incarnation && + peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) { + /* loopback creates 2 conns */ + loopback = 1; + continue; + } + + return 1; + } + + return 0; +} + +void +kranal_set_conn_uniqueness (kra_conn_t *conn) +{ + unsigned long flags; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + conn->rac_my_incarnation = kranal_data.kra_next_incarnation++; + + do { /* allocate a unique cqid */ + conn->rac_cqid = kranal_data.kra_next_cqid++; + } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL) + + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); +} + +int +kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev) +{ + kra_conn_t *conn; + RAP_RETURN rrc; + + LASSERT (!in_interrupt()); + PORTAL_ALLOC(conn, sizeof(*conn)); + + if (conn == NULL) + return -ENOMEM; + + memset(conn, 0, sizeof(*conn)); + conn->rac_cqid = cqid; + atomic_set(&conn->rac_refcount, 1); + INIT_LIST_HEAD(&conn->rac_list); + INIT_LIST_HEAD(&conn->rac_hashlist); + INIT_LIST_HEAD(&conn->rac_fmaq); + INIT_LIST_HEAD(&conn->rac_rdmaq); + INIT_LIST_HEAD(&conn->rac_replyq); + spin_lock_init(&conn->rac_lock); + + conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT); + kranal_update_reaper_timeout(conn->rac_timeout); + + rrc = RapkCreateRi(dev->rad_handle, cqid, dev->rad_ptag, + dev->rad_rdma_cq, dev->rad_fma_cq, + &conn->rac_rihandle); + if (rrc != RAP_SUCCESS) { + CERROR("RapkCreateRi failed: %d\n", rrc); + PORTAL_FREE(conn, sizeof(*conn)); + return -ENETDOWN; + } + + atomic_inc(&kranal_data.kra_nconns); + *connp = conn; + return 0; +} + +void +__kranal_conn_decref(kra_conn_t *conn) +{ + kra_tx_t *tx; + RAP_RETURN rrc; + + LASSERT (!in_interrupt()); + LASSERT (!conn->rac_scheduled); + LASSERT (list_empty(&conn->rac_list)); + LASSERT (list_empty(&conn->rac_hashlist)); + LASSERT (atomic_read(&conn->rac_refcount) == 0); + + while (!list_empty(&conn->rac_fmaq)) { + tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -ECONNABORTED); + } + + /* We may not destroy this connection while it has RDMAs outstanding */ + LASSERT (list_empty(&conn->rac_rdmaq)); + + while (!list_empty(&conn->rac_replyq)) { + tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -ECONNABORTED); + } + + rrc = RapkDestroyRi(conn->rac_device->rad_handle, + conn->rac_rihandle); + LASSERT (rrc == RAP_SUCCESS); + + if (conn->rac_peer != NULL) + kranal_peer_decref(conn->rac_peer); + + PORTAL_FREE(conn, sizeof(*conn)); + atomic_dec(&kranal_data.kra_nconns); +} + +void +kranal_terminate_conn_locked (kra_conn_t *conn) +{ + kra_peer_t *peer - conn->rac_peer; + + LASSERT (!in_interrupt()); + LASSERT (conn->rac_closing); + LASSERT (!list_empty(&conn->rac_hashlist)); + LASSERT (list_empty(&conn->rac_list)); + + /* Remove from conn hash table (no new callbacks) */ + list_del_init(&conn->rac_hashlist); + kranal_conn_decref(conn); + + /* Conn is now just waiting for remaining refs to go */ +} + +void +kranal_close_conn_locked (kra_conn_t *conn, int error) +{ + kra_peer_t *peer = conn->rac_peer; + + CDEBUG(error == 0 ? D_NET : D_ERROR, + "closing conn to "LPX64": error %d\n", peer->rap_nid, error); + + LASSERT (!in_interrupt()); + LASSERT (!conn->rac_closing); + LASSERT (!list_empty(&conn->rac_hashlist)); + LASSERT (!list_empty(&conn->rac_list)); + + list_del_init(&conn->rac_list); + + if (list_empty(&peer->rap_conns) && + peer->rap_persistence == 0) { + /* Non-persistent peer with no more conns... */ + kranal_unlink_peer_locked(peer); + } + + conn->rac_closing = 1; + kranal_schedule_conn(conn); + + kranal_conn_decref(conn); /* lose peer's ref */ +} + +void +kranal_close_conn (kra_conn_t *conn, int error) +{ + unsigned long flags; + + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (!conn->rac_closing) + kranal_close_conn_locked(conn, error); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); +} + +int +kranal_passive_conn_handshake (struct socket *sock, + ptl_nid_t **peer_nidp, kra_conn_t **connp) +{ + struct sockaddr_in addr; + __u32 peer_ip; + unsigned int peer_port; + kra_connreq_t connreq; + ptl_nid_t peer_nid; + kra_conn_t *conn; + kra_device_t *dev; + RAP_RETURN rrc; + int rc; + int i; + + rc = sock->ops->getname(newsock, (struct sockaddr *)addr, &len, 2); + if (rc != 0) { + CERROR("Can't get peer's IP: %d\n", rc); + return rc; + } + + peer_ip = ntohl(sin.sin_addr.s_addr); + peer_port = ntohs(sin.sin_port); + + if (peer_port >= 1024) { + CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n", + HIPQUAD(peer_ip), peer_port); + return -ECONNREFUSED; + } + + rc = kranal_recv_connreq(sock, &connreq, + kranal_data.kra_listener_timeout); + if (rc != 0) { + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer_ip), peer_port, rc); + return rc; + } + + peer_nid = connreq.racr_nid; + LASSERT (peer_nid != PTL_NID_ANY); + + for (i = 0;;i++) { + LASSERT(i < kranal_data.kra_ndevs); + dev = &kranal_data.kra_devices[i]; + if (dev->rad_id == connreq->racr_devid) + break; + } + + rc = kranal_alloc_conn(&conn, dev,(__u32)(peer_nid & 0xffffffff)); + if (rc != 0) + return rc; + + conn->rac_peer_incarnation = connreq.racr_incarnation; + conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); + kranal_update_reaper_timeout(conn->rac_keepalive); + + rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams); + if (rrc != RAP_SUCCESS) { + CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc); + kranal_conn_decref(conn); + return -EPROTO; + } + + kranal_pack_connreq(&connreq, conn); + + rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); + if (rc != 0) { + CERROR("Can't tx connreq to %u.%u.%u.%u/%p: %d\n", + HIPQUAD(peer_ip), peer_port, rc); + kranal_conn_decref(conn); + return rc; + } + + *connp = conn; + *peer_nidp = peer_nid; + return 0; +} + +int +ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) +{ + struct sockaddr_in locaddr; + struct sockaddr_in srvaddr; + struct socket *sock; + unsigned int port; + int rc; + int option; + mm_segment_t oldmm = get_fs(); + struct timeval tv; + + for (port = 1023; port >= 512; port--) { + + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; + locaddr.sin_port = htons(port); + locaddr.sin_addr.s_addr = htonl(INADDR_ANY); + + memset (&srvaddr, 0, sizeof (srvaddr)); + srvaddr.sin_family = AF_INET; + srvaddr.sin_port = htons (peer->rap_port); + srvaddr.sin_addr.s_addr = htonl (peer->rap_ip); + + rc = kranal_create_sock(&sock); + if (rc != 0) + return rc; + + rc = sock->ops->bind(sock, + (struct sockaddr *)&locaddr, sizeof(locaddr)); + if (rc != 0) { + sock_release(sock); + + if (rc == -EADDRINUSE) { + CDEBUG(D_NET, "Port %d already in use\n", port); + continue; + } + + CERROR("Can't bind to reserved port %d: %d\n", port, rc); + return rc; + } + + rc = sock->ops->connect(sock, + (struct sockaddr *)&srvaddr, sizeof(srvaddr), + 0); + if (rc == 0) { + *sockp = sock; + return 0; + } + + sock_release(sock); + + if (rc != -EADDRNOTAVAIL) { + CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n", + port, HIPQUAD(peer->rap_ip), peer->rap_port, rc); + return rc; + } + + CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", + port, HIPQUAD(peer->rap_ip), peer->rap_port); + } +} + + +int +kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) +{ + kra_connreq_t connreq; + kra_conn_t *conn; + kra_device_t *dev; + struct socket *sock; + __u32 id32; + RAP_RETURN rrc; + int rc; + + id32 = (peer_nid & 0xffffffff); + dev = &kranal_data.kra_devices[id32 % kranal_data.kra_ndevs]; + + rc = kranal_alloc_conn(&conn, dev, id32); + if (rc != 0) + return rc; + + kranal_pack_connreq(&connreq, conn); + + memset(&dstaddr, 0, sizeof(addr)); + dstaddr.sin_family = AF_INET; + dstaddr.sin_port = htons(peer->rap_port); + dstaddr.sin_addr.s_addr = htonl(peer->rap_ip); + + memset(&srcaddr, 0, sizeof(addr)); + + rc = ranal_connect_sock(peer, &sock); + if (rc != 0) + goto failed_0; + + /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout + * immediately after accepting a connection, so we connect and then + * send immediately. */ + + rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); + if (rc != 0) { + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer->rap_ip), peer->rap_port, rc); + goto failed_1; + } + + rc = kranal_recv_connreq(sock, &connreq, kranal_data.kra_timeout); + if (rc != 0) { + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer->rap_ip), peer->rap_port, rc); + goto failed_1; + } + + sock_release(sock); + rc = -EPROTO; + + if (connreq.racr_nid != peer->rap_nid) { + CERROR("Unexpected nid from %u.%u.%u.%u/%d: " + "received "LPX64" expected "LPX64"\n", + HIPQUAD(peer->rap_ip), peer->rap_port, + connreq.racr_nid, peer->rap_nid); + goto failed_0; + } + + if (connreq.racr_devid != dev->rad_id) { + CERROR("Unexpected device id from %u.%u.%u.%u/%d: " + "received %d expected %d\n", + HIPQUAD(peer->rap_ip), peer->rap_port, + connreq.racr_devid, dev->rad_id); + goto failed_0; + } + + conn->rac_peer_incarnation = connreq.racr_incarnation; + conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); + kranal_update_reaper_timeout(conn->rac_keepalive); + + rc = -ENETDOWN; + rrc = RapkSetRiParams(conn->rac_rihandle, + &connreq->racr_riparams); + if (rrc != RAP_SUCCESS) { + CERROR("Can't set riparams for "LPX64": %d\n", + peer_nid, rrc); + goto failed_0; + } + + *connp = conn; + return 0; + + failed_1: + release_sock(sock); + failed_0: + kranal_conn_decref(conn); + return rc; +} + +int +kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer) +{ + kranal_peer_t *peer2; + ptl_nid_t peer_nid; + unsigned long flags; + unsigned long timeout; + kra_conn_t *conn; + int rc; + int nstale; + + if (sock != NULL) { + /* passive: listener accepted sock */ + LASSERT (peer == NULL); + + rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn); + if (rc != 0) + return rc; + + /* assume this is a new peer */ + peer = kranal_create_peer(peer_nid); + if (peer == NULL) { + CERROR("Can't allocate peer for "LPX64"\n", peer_nid); + kranal_conn_decref(conn); + return -ENOMEM; + } + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + peer2 = kranal_find_peer_locked(peer_nid); + if (peer2 == NULL) { + /* peer table takes my initial ref on peer */ + list_add_tail(&peer->rap_list, + kranal_nid2peerlist(peer_nid)); + } else { + /* peer_nid already in the peer table */ + kranal_peer_decref(peer); + peer = peer2; + } + /* NB I may now have a non-persistent peer in the peer + * table with no connections: I can't drop the global lock + * until I've given it a connection or removed it, and when + * I do 'peer' can disappear under me. */ + } else { + /* active: connd wants to connect to peer */ + LASSERT (peer != NULL); + LASSERT (peer->rap_connecting); + + rc = kranal_active_conn_handshake(peer, &conn); + if (rc != 0) + return rc; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (!kranal_peer_active(peer)) { + /* raced with peer getting unlinked */ + write_unlock_irqrestore(&kranal_data.kra_global_lock, + flags); + kranal_conn_decref(conn); + return ESTALE; + } + } + + LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */ + peer_nid = peer->rap_nid; + + /* Refuse to duplicate an existing connection (both sides might try + * to connect at once). NB we return success! We _do_ have a + * connection (so we don't need to remove the peer from the peer + * table) and we _don't_ have any blocked txs to complete */ + if (kranal_conn_isdup_locked(peer, conn->rac_incarnation)) { + LASSERT (!list_empty(&peer->rap_conns)); + LASSERT (list_empty(&peer->rap_tx_queue)); + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + CWARN("Not creating duplicate connection to "LPX64"\n", + peer_nid); + kranal_conn_decref(conn); + return 0; + } + + kranal_peer_addref(peer); /* +1 ref for conn */ + conn->rac_peer = peer; + list_add_tail(&conn->rac_list, &peer->rap_conns); + + kranal_conn_addref(conn); /* +1 ref for conn table */ + list_add_tail(&conn->rac_hashlist, + kranal_cqid2connlist(conn->rac_cqid)); + + /* Schedule all packets blocking for a connection */ + while (!list_empty(&peer->rap_tx_queue)) { + tx = list_entry(&peer->rap_tx_queue.next, + kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_queue_tx_locked(tx, conn); + } + + nstale = kranal_close_stale_conns_locked(peer, conn->rac_incarnation); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + /* CAVEAT EMPTOR: passive peer can disappear NOW */ + + if (nstale != 0) + CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid); + + /* Ensure conn gets checked. Transmits may have been queued and an + * FMA event may have happened before it got in the cq hash table */ + kranal_schedule_conn(conn); + return 0; +} + +void +kranal_connect (kra_peer_t *peer) +{ + kra_tx_t *tx; + unsigned long flags; + struct list_head zombies; + int rc; + + LASSERT (peer->rap_connecting); + + rc = kranal_conn_handshake(NULL, peer); + + write_lock_irqqsave(&kranal_data.kra_global_lock, flags); + + LASSERT (peer->rap_connecting); + peer->rap_connecting = 0; + + if (rc == 0) { + /* kranal_conn_handshake() queues blocked txs immediately on + * success to avoid messages jumping the queue */ + LASSERT (list_empty(&peer->rap_tx_queue)); + + /* reset reconnection timeouts */ + peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; + peer->rap_reconnect_time = CURRENT_TIME; + + write_unlock_irqrestore(&kranal-data.kra_global_lock, flags); + return; + } + + LASSERT (peer->rap_reconnect_interval != 0); + peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval; + peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL, + 1 * peer->rap_reconnect_interval); + + /* Grab all blocked packets while we have the global lock */ + list_add(&zombies, &peer->rap_tx_queue); + list_del_init(&peer->rap_tx_queue); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + if (list_empty(&zombies)) + return; + + CWARN("Dropping packets for "LPX64": connection failed\n", + peer->rap_nid); + + do { + tx = list_entry(zombies.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -EHOSTUNREACH); + + } while (!list_empty(&zombies)); +} + +int +kranal_listener(void *arg) +{ + struct sockaddr_in addr; + wait_queue_t wait; + struct socket *sock; + struct socket *newsock; + int port; + int backlog; + int timeout; + kra_connreq_t *connreqs; + char name[16]; + + /* Parent thread holds kra_nid_mutex, and is, or is about to + * block on kra_listener_signal */ + + port = kra_tunables.kra_port; + snprintf(name, "kranal_lstn%03d", port); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + rc = -ENOMEM; + PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs)); + if (connreqs == NULL) + goto out_0; + + rc = kranal_create_sock(&sock, port); + if (rc != 0) + goto out_1; + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY + + rc = sock->ops->bind(sock, &addr, sizeof(addr)); + if (rc != 0) { + CERROR("Can't bind to port %d\n", port); + goto out_2; + } + + rc = sock->ops->listen(sock, kra_tunalbes.kra_backlog); + if (rc != 0) { + CERROR("Can't set listen backlog %d: %d\n", backlog, rc); + goto out_2; + } + + LASSERT (kranal_data.kra_listener_sock == NULL); + kranal_data.kra_listener_sock = sock; + + /* unblock waiting parent */ + LASSERT (kranal_data.kra_listener_shutdown == 0); + up(&kranal_data.kra_listener_signal); + + /* Wake me any time something happens on my socket */ + add_wait_queue(sock->sk->sk_sleep, &wait); + + while (kranal_data.kra_listener_shutdown == 0) { + + newsock = sock_alloc(); + if (newsock == NULL) { + CERROR("Can't allocate new socket for accept\n"); + kranal_pause(HZ); + continue; + } + + set_current_state(TASK_INTERRUPTIBLE); + + rc = sock->ops->accept(sock, newsock, O_NONBLOCK); + + if (rc == -EAGAIN && + kranal_data.kra_listener_shutdown == 0) + schedule(); + + set_current_state(TASK_RUNNING); + + if (rc != 0) { + sock_release(newsock); + if (rc != -EAGAIN) { + CERROR("Accept failed: %d\n", rc); + kranal_pause(HZ); + } + continue; + } + + kranal_conn_handshake(newsock, NULL); + sock_release(newsock); + } + + rc = 0; + remove_wait_queue(sock->sk->sk_sleep, &wait); + out_2: + sock_release(sock); + kranal_data.kra_listener_sock = NULL; + out_1: + PORTAL_FREE(connreqs, 2 * sizeof(*connreqs)); + out_0: + /* set completion status and unblock thread waiting for me + * (parent on startup failure, executioner on normal shutdown) */ + kranal_data.kra_listener_shutdown = rc; + up(&kranal_data.kra_listener_signal); + + return 0; +} + +int +kranal_start_listener () +{ + long pid; + int rc; + + CDEBUG(D_WARNING, "Starting listener\n"); + + /* Called holding kra_nid_mutex: listener stopped */ + LASSERT (kranal_data.kra_listener_sock == NULL); + + kranal_data.kra_listener_shutdown == 0; + pid = kernel_thread(kranal_listener, sock, 0); + if (pid < 0) { + CERROR("Can't spawn listener: %ld\n", pid); + return (int)pid; + } + + /* Block until listener has started up. */ + down(&kranal_data.kra_listener_signal); + + rc = kranal_data.kra_listener_shutdown; + LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL)); + + CDEBUG(D_WARNING, "Listener %ld started OK\n", pid); + return rc; +} + +void +kranal_stop_listener() +{ + CDEBUG(D_WARNING, "Stopping listener\n"); + + /* Called holding kra_nid_mutex: listener running */ + LASSERT (kranal_data.kra_listener_sock != NULL); + + kranal_data.kra_listener_shutdown = 1; + wake_up_all(kranal_data->kra_listener_sock->sk->sk_sleep); + + /* Block until listener has torn down. */ + down(&kranal_data.kra_listener_signal); + + LASSERT (kranal_data.kra_listener_sock == NULL); + CDEBUG(D_WARNING, "Listener stopped\n"); +} + +int +kranal_listener_procint(ctl_table *table, int write, struct file *filp, + void *buffer, size_t *lenp) +{ + int *tunable = (int *)table->data; + int old_val; + int rc; + + down(&kranal_data.kra_nid_mutex); + + LASSERT (tunable == &kranal_data.kra_port || + tunable == &kranal_data.kra_backlog); + old_val = *tunable; + + rc = proc_dointvec(table, write, filp, buffer, lenp); + + if (write && + (*tunable != old_val || + kranal_data.kra_listener_sock == NULL)) { + + if (kranal_data.kra_listener_sock != NULL) + kranal_stop_listener(); + + rc = kranal_start_listener(); + + if (rc != 0) { + *tunable = old_val; + kranal_start_listener(); + } + } + + up(&kranal_data.kra_nid_mutex); + return rc; +} + +int +kranal_set_mynid(ptl_nid_t nid) +{ + lib_ni_t *ni = &kranal_lib.libnal_ni; + int rc; + + CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n", + nid, ni->ni_pid.nid); + + down(&kranal_data.kra_nid_mutex); + + if (nid == ni->ni_pid.nid) { + /* no change of NID */ + up(&kranal_data.kra_nid_mutex); + return 0; + } + + if (kranal_data.kra_listener_sock != NULL) + kranal_stop_listener(); + + ni->ni_pid.nid = nid; + + /* Delete all existing peers and their connections after new + * NID/incarnation set to ensure no old connections in our brave + * new world. */ + kranal_del_peer(PTL_NID_ANY, 0); + + if (nid != PTL_NID_ANY) + rc = kranal_start_listener(); + + up(&kranal_data.kra_nid_mutex); + return rc; +} + +kra_peer_t * +kranal_create_peer (ptl_nid_t nid) +{ + kra_peer_t *peer; + + LASSERT (nid != PTL_NID_ANY); + + PORTAL_ALLOC(peer, sizeof(*peer)); + if (peer == NULL) + return NULL; + + memset(peer, 0, sizeof(*peer)); /* zero flags etc */ + + peer->rap_nid = nid; + atomic_set(&peer->rap_refcount, 1); /* 1 ref for caller */ + + INIT_LIST_HEAD(&peer->rap_list); /* not in the peer table yet */ + INIT_LIST_HEAD(&peer->rap_conns); + INIT_LIST_HEAD(&peer->rap_tx_queue); + + peer->rap_reconnect_time = CURRENT_TIME; + peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; + + atomic_inc(&kranal_data.kra_npeers); + return peer; +} + +void +__kranal_peer_decref (kra_peer_t *peer) +{ + CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer); + + LASSERT (atomic_read(&peer->rap_refcount) == 0); + LASSERT (peer->rap_persistence == 0); + LASSERT (!kranal_peer_active(peer)); + LASSERT (peer->rap_connecting == 0); + LASSERT (list_empty(&peer->rap_conns)); + LASSERT (list_empty(&peer->rap_tx_queue)); + + PORTAL_FREE(peer, sizeof(*peer)); + + /* NB a peer's connections keep a reference on their peer until + * they are destroyed, so we can be assured that _all_ state to do + * with this peer has been cleaned up when its refcount drops to + * zero. */ + atomic_dec(&kranal_data.kra_npeers); +} + +kra_peer_t * +kranal_find_peer_locked (ptl_nid_t nid) +{ + struct list_head *peer_list = kranal_nid2peerlist(nid); + struct list_head *tmp; + kra_peer_t *peer; + + list_for_each (tmp, peer_list) { + + peer = list_entry(tmp, kra_peer_t, rap_list); + + LASSERT (peer->rap_persistence > 0 || /* persistent peer */ + !list_empty(&peer->rap_conns)); /* active conn */ + + if (peer->rap_nid != nid) + continue; + + CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n", + peer, nid, atomic_read(&peer->rap_refcount)); + return peer; + } + return NULL; +} + +kra_peer_t * +kranal_find_peer (ptl_nid_t nid) +{ + kra_peer_t *peer; + + read_lock(&kranal_data.kra_global_lock); + peer = kranal_find_peer_locked(nid); + if (peer != NULL) /* +1 ref for caller? */ + kranal_peer_addref(peer); + read_unlock(&kranal_data.kra_global_lock); + + return peer; +} + +void +kranal_unlink_peer_locked (kra_peer_t *peer) +{ + LASSERT (peer->rap_persistence == 0); + LASSERT (list_empty(&peer->rap_conns)); + + LASSERT (kranal_peer_active(peer)); + list_del_init(&peer->rap_list); + + /* lose peerlist's ref */ + kranal_peer_decref(peer); +} + +int +kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep) +{ + kra_peer_t *peer; + struct list_head *ptmp; + int i; + + read_lock(&kranal_data.kra_global_lock); + + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) { + + list_for_each(ptmp, &kranal_data.kra_peers[i]) { + + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + if (index-- > 0) + continue; + + *nidp = peer->rap_nid; + *portp = peer->rap_port; + *persistencep = peer->rap_persistence; + + read_unlock(&kranal_data.kra_global_lock); + return 0; + } + } + + read_unlock(&kranal_data.kra_global_lock); + return -ENOENT; +} + +int +kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port) +{ + unsigned long flags; + kra_peer_t *peer; + kra_peer_t *peer2; + + if (nid == PTL_NID_ANY) + return -EINVAL; + + peer = kranal_create_peer(nid); + if (peer == NULL) + return -ENOMEM; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + peer2 = kranal_find_peer_locked(nid); + if (peer2 != NULL) { + kranal_put_peer(peer); + peer = peer2; + } else { + /* peer table takes existing ref on peer */ + list_add_tail(&peer->rap_list, + kranal_nid2peerlist(nid)); + } + + peer->rap_ip = ip; + peer->rap_port = port; + peer->rap_persistence++; + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + return 0; +} + +void +kranal_del_peer_locked (kra_peer_t *peer, int single_share) +{ + struct list_head *ctmp; + struct list_head *cnxt; + kra_conn_t *conn; + + if (!single_share) + peer->rap_persistence = 0; + else if (peer->rap_persistence > 0) + peer->rap_persistence--; + + if (peer->rap_persistence != 0) + return; + + if (list_empty(&peer->rap_conns)) { + kranal_unlink_peer_locked(peer); + } else { + list_for_each_safe(ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + kranal_close_conn_locked(conn, 0); + } + /* peer unlinks itself when last conn is closed */ + } +} + +int +kranal_del_peer (ptl_nid_t nid, int single_share) +{ + unsigned long flags; + struct list_head *ptmp; + struct list_head *pnxt; + kra_peer_t *peer; + int lo; + int hi; + int i; + int rc = -ENOENT; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (nid != PTL_NID_ANY) + lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers; + else { + lo = 0; + hi = kranal_data.kra_peer_hash_size - 1; + } + + for (i = lo; i <= hi; i++) { + list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) { + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + if (!(nid == PTL_NID_ANY || peer->rap_nid == nid)) + continue; + + kranal_del_peer_locked(peer, single_share); + rc = 0; /* matched something */ + + if (single_share) + goto out; + } + } + out: + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + return rc; +} + +kra_conn_t * +kranal_get_conn_by_idx (int index) +{ + kra_peer_t *peer; + struct list_head *ptmp; + kra_conn_t *conn; + struct list_head *ctmp; + int i; + + read_lock (&kranal_data.kra_global_lock); + + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) { + list_for_each (ptmp, &kranal_data.kra_peers[i]) { + + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + list_for_each (ctmp, &peer->rap_conns) { + if (index-- > 0) + continue; + + conn = list_entry(ctmp, kra_conn_t, rac_list); + CDEBUG(D_NET, "++conn[%p] -> "LPX64" (%d)\n", + conn, conn->rac_peer->rap_nid, + atomic_read(&conn->rac_refcount)); + atomic_inc(&conn->rac_refcount); + read_unlock(&kranal_data.kra_global_lock); + return conn; + } + } + } + + read_unlock(&kranal_data.kra_global_lock); + return NULL; +} + +int +kranal_close_peer_conns_locked (kra_peer_t *peer, int why) +{ + kra_conn_t *conn; + struct list_head *ctmp; + struct list_head *cnxt; + int count = 0; + + list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + count++; + kranal_close_conn_locked(conn, why); + } + + return count; +} + +int +kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation) +{ + kra_conn_t *conn; + struct list_head *ctmp; + struct list_head *cnxt; + int count = 0; + + list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + if (conn->rac_incarnation == incarnation) + continue; + + CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n", + peer->rap_nid, conn->rac_incarnation, incarnation); + LASSERT (conn->rac_incarnation < incarnation); + + count++; + kranal_close_conn_locked(conn, -ESTALE); + } + + return count; +} + +int +kranal_close_matching_conns (ptl_nid_t nid) +{ + unsigned long flags; + kra_peer_t *peer; + struct list_head *ptmp; + struct list_head *pnxt; + int lo; + int hi; + int i; + int count = 0; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (nid != PTL_NID_ANY) + lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers; + else { + lo = 0; + hi = kranal_data.kra_peer_hash_size - 1; + } + + for (i = lo; i <= hi; i++) { + list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) { + + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + if (!(nid == PTL_NID_ANY || nid == peer->rap_nid)) + continue; + + count += kranal_close_peer_conns_locked(peer, 0); + } + } + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + /* wildcards always succeed */ + if (nid == PTL_NID_ANY) + return 0; + + return (count == 0) ? -ENOENT : 0; +} + +int +kranal_cmd(struct portals_cfg *pcfg, void * private) +{ + int rc = -EINVAL; + + LASSERT (pcfg != NULL); + + switch(pcfg->pcfg_command) { + case NAL_CMD_GET_PEER: { + ptl_nid_t nid = 0; + __u32 ip = 0; + int port = 0; + int share_count = 0; + + rc = kranal_get_peer_info(pcfg->pcfg_count, + &nid, &ip, &port, &share_count); + pcfg->pcfg_nid = nid; + pcfg->pcfg_size = 0; + pcfg->pcfg_id = ip; + pcfg->pcfg_misc = port; + pcfg->pcfg_count = 0; + pcfg->pcfg_wait = share_count; + break; + } + case NAL_CMD_ADD_PEER: { + rc = kranal_add_persistent_peer(pcfg->pcfg_nid, + pcfg->pcfg_id, /* IP */ + pcfg->pcfg_misc); /* port */ + break; + } + case NAL_CMD_DEL_PEER: { + rc = kranal_del_peer(pcfg->pcfg_nid, + /* flags == single_share */ + pcfg->pcfg_flags != 0); + break; + } + case NAL_CMD_GET_CONN: { + kra_conn_t *conn = kranal_get_conn_by_idx(pcfg->pcfg_count); + + if (conn == NULL) + rc = -ENOENT; + else { + rc = 0; + pcfg->pcfg_nid = conn->rac_peer->rap_nid; + pcfg->pcfg_id = 0; + pcfg->pcfg_misc = 0; + pcfg->pcfg_flags = 0; + kranal_put_conn(conn); + } + break; + } + case NAL_CMD_CLOSE_CONNECTION: { + rc = kranal_close_matching_conns(pcfg->pcfg_nid); + break; + } + case NAL_CMD_REGISTER_MYNID: { + if (pcfg->pcfg_nid == PTL_NID_ANY) + rc = -EINVAL; + else + rc = kranal_set_mynid(pcfg->pcfg_nid); + break; + } + } + + return rc; +} + +void +kranal_free_txdescs(struct list_head *freelist) +{ + kra_tx_t *tx; + + while (!list_empty(freelist)) { + tx = list_entry(freelist->next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + PORTAL_FREE(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys)); + PORTAL_FREE(tx, sizeof(*tx)); + } +} + +int +kranal_alloc_txdescs(struct list_head *freelist, int n) +{ + int isnblk = (freelist == &kranal_data.kra_idle_nblk_txs); + int i; + kra_tx_t *tx; + + LASSERT (freelist == &kranal_data.kra_idle_txs || + freelist == &kranal_data.kra_idle_nblk_txs); + LASSERT (list_empty(freelist)); + + for (i = 0; i < n; i++) { + + PORTAL_ALLOC(tx, sizeof(*tx)); + if (tx == NULL) { + CERROR("Can't allocate %stx[%d]\n", + isnblk ? "nblk ", i); + kranal_free_txdescs(); + return -ENOMEM; + } + + PORTAL_ALLOC(tx->tx_phys, + PLT_MD_MAX_IOV * sizeof(*tx->tx_phys)); + if (tx->tx_phys == NULL) { + CERROR("Can't allocate %stx[%d]->tx_phys\n", + isnblk ? "nblk ", i); + + PORTAL_FREE(tx, sizeof(*tx)); + kranal_free_txdescs(freelist); + return -ENOMEM; + } + + tx->tx_isnblk = isnblk + tx->tx_buftype = RANAL_BUF_NONE; + + list_add(&tx->tx_list, freelist); + } + + return 0; +} + +int +kranal_device_init(int id, kra_device_t *dev) +{ + const int total_ntx = RANAL_NTX + RANAL_NTX_NBLK; + RAP_RETURN rrc; + + dev->rad_id = id; + rrc = RapkGetDeviceByIndex(id, NULL, kranal_device_callback, + &dev->rad_handle); + if (rrc != RAP_SUCCESS) { + CERROR("Can't get Rapidarray Device %d: %d\n", idx, rrc); + goto failed_0; + } + + rrc = RapkReserveRdma(dev->rad_handle, total_ntx); + if (rrc != RAP_SUCCESS) { + CERROR("Can't reserve %d RDMA descriptors" + " for device[%d]: %d\n", total_ntx, i, rrc); + goto failed_1; + } + + rrc = RapkCreatePtag(dev->rad_handle, + &dev->rad_ptag); + if (rrc != RAP_SUCCESS) { + CERROR("Can't create ptag" + " for device[%d]: %d\n", i, rrc); + goto failed_1; + } + + rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag, + &dev->rad_rdma_cq); + if (rrc != RAP_SUCCESS) { + CERROR("Can't create rdma cq size %d" + " for device[%d]: %d\n", total_ntx, i, rrc); + goto failed_2; + } + + rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, + dev->rad_ptag, &dev->rad_fma_cq); + if (rrc != RAP_SUCCESS) { + CERROR("Can't create fma cq size %d" + " for device[%d]: %d\n", RANAL_RX_CQ_SIZE, i, rrc); + goto failed_3; + } + + return 0; + + failed_3: + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); + failed_2: + RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + failed_1: + RapkReleaseDevice(dev->rad_handle); + failed_0: + return -ENODEV; +} + +void +kranal_device_fini(kra_device_t *dev) +{ + RapkDestroyCQ(dev->rad_handle, dev->rad_rx_cq, dev->rad_ptag); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); + RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + RapkReleaseDevice(dev->rad_handle); +} + +void +kranal_api_shutdown (nal_t *nal) +{ + int i; + int rc; + unsigned long flags; + + if (nal->nal_refct != 0) { + /* This module got the first ref */ + PORTAL_MODULE_UNUSE; + return; + } + + CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n", + atomic_read(&portal_kmemory)); + + LASSERT (nal == &kranal_api); + + switch (kranal_data.kra_init) { + default: + CERROR("Unexpected state %d\n", kranal_data.kra_init); + LBUG(); + + case RANAL_INIT_ALL: + /* stop calls to nal_cmd */ + libcfs_nal_cmd_unregister(OPENRANAL); + /* No new persistent peers */ + + /* resetting my NID to unadvertises me, removes my + * listener and nukes all current peers */ + kranal_set_mynid(PTL_NID_ANY); + /* no new peers or conns */ + + /* Wait for all peer/conn state to clean up */ + i = 2; + while (atomic_read(&kranal_data.kra_nconns) != 0 || + atomic_read(&kranal-data.kra_npeers) != 0) { + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ + "waiting for %d peers and %d conns to close down\n", + atomic_read(&kranal_data.kra_npeers), + atomic_read(&kranal_data.kra_nconns)); + kranal_pause(HZ); + } + /* fall through */ + + case RANAL_INIT_LIB: + lib_fini(&kranal_lib); + /* fall through */ + + case RANAL_INIT_DATA: + break; + } + + /* flag threads to terminate; wake and wait for them to die */ + kranal_data.kra_shutdown = 1; + + for (i = 0; i < kranal_data.kra_ndevs; i++) { + kra_device_t *dev = &kranal_data.kra_devices[i]; + + LASSERT (list_empty(&dev->rad_connq)); + + spin_lock_irqsave(&dev->rad_lock, flags); + wake_up(&dev->rad_waitq); + spin_unlock_irqrestore(&dev->rad_lock, flags); + } + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + wake_up_all(&kranal_data.kra_reaper_waitq); + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); + + LASSERT (list_empty(&kranal_data.kra_connd_peers)); + spin_lock_irqsave(&kranal-data.kra_connd_lock, flags); + wake_up_all(&kranal_data.kra_connd_waitq); + spin_unlock_irqrestore(&kranal-data.kra_connd_lock, flags); + + i = 2; + while (atomic_read(&kranal_data.kra_nthreads) != 0) { + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ + "Waiting for %d threads to terminate\n", + atomic_read(&kranal_data.kra_nthreads)); + kranal_pause(HZ); + } + + LASSERT (atomic_read(&kranal_data.kra_npeers) == 0); + if (kranal_data.kra_peers != NULL) { + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) + LASSERT (list_empty(&kranal_data.kra_peers[i])); + + PORTAL_FREE(kranal_data.kra_peers, + sizeof (struct list_head) * + kranal_data.kra_peer_hash_size); + } + + LASSERT (atomic_read(&kranal_data.kra_nconns) == 0); + if (kranal_data.kra_conns != NULL) { + for (i = 0; i < kranal_data.kra_conn_hash_size; i++) + LASSERT (list_empty(&kranal_data.kra_conns[i])); + + PORTAL_FREE(kranal_data.kra_conns, + sizeof (struct list_head) * + kranal_data.kra_conn_hash_size); + } + + for (i = 0; i < kranal_data.kra_ndevs; i++) + kranal_device_fini(&kranal_data.kra_devices[i]); + + kranal_free_txdescs(&kranal_data.kra_idle_txs); + kranal_free_txdescs(&kranal_data.kra_idle_nblk_txs); + + CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n", + atomic_read(&portal_kmemory)); + printk(KERN_INFO "Lustre: RapidArray NAL unloaded (final mem %d)\n", + atomic_read(&portal_kmemory)); + + kranal_data.kra_init = RANAL_INIT_NOTHING; +} + +int +kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, + ptl_ni_limits_t *requested_limits, + ptl_ni_limits_t *actual_limits) +{ + static int device_ids[] = {RAPK_MAIN_DEVICE_ID, + RAPK_EXPANSION_DEVICE_ID}; + struct timeval tv; + ptl_process_id_t process_id; + int pkmem = atomic_read(&portal_kmemory); + int rc; + int i; + kra_device_t *dev; + + LASSERT (nal == &kranal_api); + + if (nal->nal_refct != 0) { + if (actual_limits != NULL) + *actual_limits = kranal_lib.libnal_ni.ni_actual_limits; + /* This module got the first ref */ + PORTAL_MODULE_USE; + return PTL_OK; + } + + LASSERT (kranal_data.kra_init == RANAL_INIT_NOTHING); + + memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */ + + /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and + * a unique (for all time) incarnation so we can uniquely identify + * the sender. The incarnation is an incrementing counter + * initialised with seconds + microseconds at startup time. So we + * rely on NOT creating connections more frequently on average than + * 1MHz to ensure we don't use old incarnations when we reboot. */ + do_gettimeofday(&tv); + kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; + + init_MUTEX(&kranal_data.kra_nid_mutex); + init_MUTEX_LOCKED(&kranal_data.kra_listener_signal); + + rwlock_init(&kranal_data.kra_global_lock); + + for (i = 0; i < RANAL_MAXDEVS; i++ ) { + kra_device_t *dev = &kranal_data.kra_devices[i]; + + dev->rad_idx = i; + INIT_LIST_HEAD(&dev->rad_connq); + init_waitqueue_head(&dev->rad_waitq); + spin_lock_init(&dev->rad_lock); + } + + init_waitqueue_head(&kranal_data.kra_reaper_waitq); + spin_lock_init(&kranal_data.kra_reaper_lock); + + INIT_LIST_HEAD(&kranal_data.kra_connd_peers); + init_waitqueue_head(&kranal_data.kra_connd_waitq); + spin_lock_init(&kranal_data.kra_connd_lock); + + INIT_LIST_HEAD(&kranal_data.kra_idle_txs); + INIT_LIST_HEAD(&kranal_data.kra_idle_nblk_txs); + init_waitqueue_head(&kranal_data.kra_idle_tx_waitq); + spin_lock_init(&kranal_data.kra_tx_lock); + + /* OK to call kranal_api_shutdown() to cleanup now */ + kranal_data.kra_init = RANAL_INIT_DATA; + + kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE; + PORTAL_ALLOC(kranal_data.kra_peers, + sizeof(struct list_head) * kranal_data.kra_peer_hash_size); + if (kranal_data.kra_peers == NULL) + goto failed; + + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) + INIT_LIST_HEAD(&kranal_data.kra_peers[i]); + + kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE; + PORTAL_ALLOC(kranal_data.kra_conns, + sizeof(struct list_head) * kranal_data.kra_conn_hash_size); + if (kranal_data.kra_conns == NULL) + goto failed; + + for (i = 0; i < kranal_data.kra_conn_hash_size; i++) + INIT_LIST_HEAD(&kranal_data.kra_conns[i]); + + rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, RANAL_NTX); + if (rc != 0) + goto failed; + + rc = kranal_alloc_txdescs(&kranal_data.kra_idle_nblk_txs,RANAL_NTX_NBLK); + if (rc != 0) + goto failed; + + process_id.pid = requested_pid; + process_id.nid = PTL_NID_ANY; /* don't know my NID yet */ + + rc = lib_init(&kranal_lib, nal, process_id, + requested_limits, actual_limits); + if (rc != PTL_OK) { + CERROR("lib_init failed: error %d\n", rc); + goto failed; + } + + /* lib interface initialised */ + kranal_data.kra_init = RANAL_INIT_LIB; + /*****************************************************/ + + rc = kranal_thread_start(kranal_reaper, NULL); + if (rc != 0) { + CERROR("Can't spawn ranal reaper: %d\n", rc); + goto failed; + } + + for (i = 0; i < RANAL_N_CONND; i++) { + rc = kranal_thread_start(kranal_connd, (void *)i); + if (rc != 0) { + CERROR("Can't spawn ranal connd[%d]: %d\n", + i, rc); + goto failed; + } + } + + LASSERT(kranal_data.kra_ndevs == 0); + for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) { + dev = &kranal_data.kra_devices[kranal_data.kra_ndevs]; + + rc = kranal_device_init(device_ids[i], dev); + if (rc == 0) + kranal_data.kra_ndevs++; + + rc = kranal_thread_start(kranal_scheduler, dev); + if (rc != 0) { + CERROR("Can't spawn ranal scheduler[%d]: %d\n", + i, rc); + goto failed; + } + } + + if (kranal_data.kra_ndevs == 0) + goto failed; + + rc = libcfs_nal_cmd_register(OPENRANAL, &kranal_cmd, NULL); + if (rc != 0) { + CERROR("Can't initialise command interface (rc = %d)\n", rc); + goto failed; + } + + /* flag everything initialised */ + kranal_data.kra_init = RANAL_INIT_ALL; + /*****************************************************/ + + CDEBUG(D_MALLOC, "initial kmem %d\n", atomic_read(&portal_kmemory)); + printk(KERN_INFO "Lustre: RapidArray NAL loaded " + "(initial mem %d)\n", pkmem); + + return PTL_OK; + + failed: + kranal_api_shutdown(&kranal_api); + return PTL_FAIL; +} + +void __exit +kranal_module_fini (void) +{ +#ifdef CONFIG_SYSCTL + if (kranal_tunables.kra_sysctl != NULL) + unregister_sysctl_table(kranal_tunables.kra_sysctl); +#endif + PtlNIFini(kranal_ni); + + ptl_unregister_nal(OPENRANAL); +} + +int __init +kranal_module_init (void) +{ + int rc; + + /* the following must be sizeof(int) for + * proc_dointvec/kranal_listener_procint() */ + LASSERT (sizeof(kranal_tunables.kra_timeout) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_listener_timeout) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_backlog) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_port) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_max_immediate) == sizeof(int)); + + kranal_api.nal_ni_init = kranal_api_startup; + kranal_api.nal_ni_fini = kranal_api_shutdown; + + /* Initialise dynamic tunables to defaults once only */ + kranal_tunables.kra_timeout = RANAL_TIMEOUT; + + rc = ptl_register_nal(OPENRANAL, &kranal_api); + if (rc != PTL_OK) { + CERROR("Can't register RANAL: %d\n", rc); + return -ENOMEM; /* or something... */ + } + + /* Pure gateways want the NAL started up at module load time... */ + rc = PtlNIInit(OPENRANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni); + if (rc != PTL_OK && rc != PTL_IFACE_DUP) { + ptl_unregister_nal(OPENRANAL); + return -ENODEV; + } + +#ifdef CONFIG_SYSCTL + /* Press on regardless even if registering sysctl doesn't work */ + kranal_tunables.kra_sysctl = + register_sysctl_table(kranal_top_ctl_table, 0); +#endif + return 0; +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Kernel RapidArray NAL v0.01"); +MODULE_LICENSE("GPL"); + +module_init(kranal_module_init); +module_exit(kranal_module_fini); diff --git a/lnet/klnds/ralnd/ralnd.h b/lnet/klnds/ralnd/ralnd.h new file mode 100644 index 0000000..c134179 --- /dev/null +++ b/lnet/klnds/ralnd/ralnd.h @@ -0,0 +1,438 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2004 Cluster File Systems, Inc. + * Author: Eric Barton + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG_SUBSYSTEM S_NAL + +#include +#include +#include +#include + +#include + +#if CONFIG_SMP +# define RANAL_N_SCHED num_online_cpus() /* # schedulers */ +#else +# define RANAL_N_SCHED 1 /* # schedulers */ +#endif + +#define RANAL_MAXDEVS 2 /* max # devices RapidArray supports */ + +#define RANAL_N_CONND 4 /* # connection daemons */ + +#define RANAL_MIN_RECONNECT_INTERVAL 1 /* first failed connection retry (seconds)... */ +#define RANAL_MAX_RECONNECT_INTERVAL 60 /* ...exponentially increasing to this */ + +#define RANAL_FMA_PREFIX_LEN 232 /* size of FMA "Prefix" */ +#define RANAL_FMA_MAX_DATA_LEN ((7<<10)-256) /* Max FMA MSG is 7K including prefix */ + +#define RANAL_PEER_HASH_SIZE 101 /* # peer lists */ +#define RANAL_CONN_HASH_SIZE 101 /* # conn lists */ + +#define RANAL_NTX 64 /* # tx descs */ +#define RANAL_NTX_NBLK 256 /* # reserved tx descs */ + +#define RANAL_RX_CQ_SIZE 1024 /* # entries in receive CQ + * (overflow is a performance hit) */ + +#define RANAL_RESCHED 100 /* # scheduler loops before reschedule */ + +#define RANAL_MIN_TIMEOUT 5 /* minimum timeout interval (seconds) */ +#define RANAL_TIMEOUT2KEEPALIVE(t) (((t)+1)/2) /* timeout -> keepalive interval */ + +/* default vals for runtime tunables */ +#define RANAL_TIMEOUT 30 /* comms timeout (seconds) */ +#define RANAL_LISTENER_TIMEOUT 5 /* listener timeout (seconds) */ +#define RANAL_MAX_IMMEDIATE (2<<10) /* biggest immediate payload */ + +typedef struct +{ + int kra_timeout; /* comms timeout (seconds) */ + int kra_listener_timeout; /* max time the listener can block */ + int kra_backlog; /* listener's backlog */ + int kra_port; /* listener's TCP/IP port */ + int kra_max_immediate; /* biggest immediate payload */ + struct ctl_table_header *kra_sysctl; /* sysctl interface */ +} kra_tunables_t; + +typedef struct +{ + RAP_PVOID rad_handle; /* device handle */ + RAP_PROTECTION_HANDLE rad_ptag; /* protection tag */ + RAP_CQ_HANDLE rad_fma_cq; /* FMA (small message) completion queue */ + RAP_CQ_HANDLE rad_rdma_cq; /* rdma completion queue */ + int rad_id; /* device id */ + int rad_idx; /* index in kra_devices */ + int rad_ready; /* set by device callback */ + struct list_head rad_connq; /* connections requiring attention */ + wait_queue_head_t rad_waitq; /* scheduler waits here */ + spinlock_t rad_lock; /* serialise */ +} kra_device_t; + +typedef struct +{ + int kra_init; /* initialisation state */ + int kra_shutdown; /* shut down? */ + atomic_t kra_nthreads; /* # live threads */ + + struct semaphore kra_nid_mutex; /* serialise NID/listener ops */ + struct semaphore kra_listener_signal; /* block for listener startup/shutdown */ + struct socket *kra_listener_sock; /* listener's socket */ + int kra_listener_shutdown; /* ask listener to close */ + + kra_device_t kra_devices[RANAL_MAXDEVS]; /* device/ptag/cq etc */ + int kra_ndevs; /* # devices */ + + rwlock_t kra_global_lock; /* stabilize peer/conn ops */ + + struct list_head *kra_peers; /* hash table of all my known peers */ + int kra_peer_hash_size; /* size of kra_peers */ + atomic_t kra_npeers; /* # peers extant */ + + struct list_head *kra_conns; /* conns hashed by cqid */ + int kra_conn_hash_size; /* size of kra_conns */ + __u64 kra_next_incarnation; /* conn incarnation # generator */ + int kra_next_cqid; /* cqid generator */ + atomic_t kra_nconns; /* # connections extant */ + + long kra_new_min_timeout; /* minimum timeout on any new conn */ + wait_queue_head_t kra_reaper_waitq; /* reaper sleeps here */ + spinlock_t kra_reaper_lock; /* serialise */ + + struct list_head kra_connd_peers; /* peers waiting for a connection */ + wait_queue_head_t kra_connd_waitq; /* connection daemons sleep here */ + spinlock_t kra_connd_lock; /* serialise */ + + struct list_head kra_idle_txs; /* idle tx descriptors */ + struct list_head kra_idle_nblk_txs; /* idle reserved tx descriptors */ + __u64 kra_next_tx_cookie; /* RDMA completion cookie */ + wait_queue_head_t kra_idle_tx_waitq; /* block here for tx descriptor */ + spinlock_t kra_tx_lock; /* serialise */ +} kra_data_t; + +#define RANAL_INIT_NOTHING 0 +#define RANAL_INIT_DATA 1 + +#define RANAL_INIT_ALL 7 + +/************************************************************************ + * Wire message structs. These are sent in sender's byte order + * (i.e. receiver checks magic and flips if required). + */ + +typedef struct kra_connreq /* connection request/response */ +{ /* (sent via socket) */ + __u32 racr_magic; /* I'm an ranal connreq */ + __u16 racr_version; /* this is my version number */ + __u16 racr_devid; /* which device to connect on */ + __u64 racr_nid; /* my NID */ + __u64 racr_incarnation; /* my incarnation */ + __u32 racr_timeout; /* my timeout */ + RAP_RI_PARAMETERS racr_riparams; /* my endpoint info */ +} kra_connreq_t; + +typedef struct +{ + RAP_MEM_KEY rard_key; + RAP_PVOID64 rard_addr; + RAP_UINT32 rard_nob; +} kra_rdma_desc_t; + +typedef struct +{ + ptl_hdr_t raim_hdr; /* portals header */ + /* Portals payload is in FMA "Message Data" */ +} kra_immediate_msg_t; + +typedef struct +{ + ptl_hdr_t raprm_hdr; /* portals header */ + __u64 raprm_cookie; /* opaque completion cookie */ +} kra_putreq_msg_t; + +typedef struct +{ + __u64 rapam_src_cookie; /* reflected completion cookie */ + __u64 rapam_dst_cookie; /* opaque completion cookie */ + kra_rdma_desc_t rapam_desc; /* sender's sink buffer */ +} kra_putack_msg_t; + +typedef struct +{ + ptl_hdr_t ragm_hdr; /* portals header */ + __u64 ragm_cookie; /* opaque completion cookie */ + kra_rdma_desc_t ragm_desc; /* sender's sink buffer */ +} kra_get_msg_t; + +typedef struct +{ + __u64 racm_cookie; /* reflected completion cookie */ +} kra_completion_msg_t; + +typedef struct /* NB must fit in FMA "Prefix" */ +{ + __u32 ram_magic; /* I'm an ranal message */ + __u16 ram_version; /* this is my version number */ + __u16 ram_type; /* msg type */ + __u64 ram_srcnid; /* sender's NID */ + __u64 ram_incarnation; /* sender's connection incarnation */ + union { + kra_immediate_msg_t immediate; + kra_putreq_msg_t putreq; + kra_putack_msg_t putack; + kra_get_msg_t get; + kra_completion_msg_t completion; + } ram_u; + __u32 ram_seq; /* incrementing sequence number */ +} kra_msg_t; + +#define RANAL_MSG_MAGIC 0x0be91b92 /* unique magic */ +#define RANAL_MSG_VERSION 1 /* current protocol version */ + +#define RANAL_MSG_FENCE 0x80 /* fence RDMA */ + +#define RANAL_MSG_NONE 0x00 /* illegal message */ +#define RANAL_MSG_NOOP 0x01 /* empty ram_u (keepalive) */ +#define RANAL_MSG_IMMEDIATE 0x02 /* ram_u.immediate */ +#define RANAL_MSG_PUT_REQ 0x03 /* ram_u.putreq (src->sink) */ +#define RANAL_MSG_PUT_NAK 0x04 /* ram_u.completion (no PUT match: sink->src) */ +#define RANAL_MSG_PUT_ACK 0x05 /* ram_u.putack (PUT matched: sink->src) */ +#define RANAL_MSG_PUT_DONE 0x86 /* ram_u.completion (src->sink) */ +#define RANAL_MSG_GET_REQ 0x07 /* ram_u.get (sink->src) */ +#define RANAL_MSG_GET_NAK 0x08 /* ram_u.completion (no GET match: src->sink) */ +#define RANAL_MSG_GET_DONE 0x89 /* ram_u.completion (src->sink) */ +#define RANAL_MSG_CLOSE 0x8a /* empty ram_u */ + +/***********************************************************************/ + +typedef struct kra_tx /* message descriptor */ +{ + struct list_head tx_list; /* queue on idle_txs/rac_sendq/rac_waitq */ + struct kra_conn *tx_conn; /* owning conn */ + lib_msg_t *tx_libmsg[2]; /* lib msgs to finalize on completion */ + unsigned long tx_qtime; /* when tx started to wait for something */ + int tx_isnblk; /* I'm reserved for non-blocking sends */ + int tx_nob; /* # bytes of payload */ + int tx_buftype; /* payload buffer type */ + void *tx_buffer; /* source/sink buffer */ + int tx_phys_offset; /* first page offset (if phys) */ + int tx_phys_npages; /* # physical pages */ + RAP_PHYS_REGION *tx_phys; /* page descriptors */ + RAP_MEM_KEY tx_map_key; /* mapping key */ + RAP_RDMA_DESCRIPTOR tx_rdma_desc; /* rdma descriptor */ + __u64 tx_cookie; /* identify this tx to peer */ + kra_msg_t tx_msg; /* FMA message buffer */ +} kra_tx_t; + +#define RANAL_BUF_NONE 0 /* buffer type not set */ +#define RANAL_BUF_IMMEDIATE 1 /* immediate data */ +#define RANAL_BUF_PHYS_UNMAPPED 2 /* physical: not mapped yet */ +#define RANAL_BUF_PHYS_MAPPED 3 /* physical: mapped already */ +#define RANAL_BUF_VIRT_UNMAPPED 4 /* virtual: not mapped yet */ +#define RANAL_BUF_VIRT_MAPPED 5 /* virtual: mapped already */ + +#define RANAL_TX_IDLE 0x00 /* on freelist */ +#define RANAL_TX_SIMPLE 0x10 /* about to send a simple message */ +#define RANAL_TX_PUTI_REQ 0x20 /* PUT initiator about to send PUT_REQ */ +#define RANAL_TX_PUTI_WAIT_ACK 0x21 /* PUT initiator waiting for PUT_ACK */ +#define RANAL_TX_PUTI_RDMA 0x22 /* PUT initiator waiting for RDMA to complete */ +#define RANAL_TX_PUTI_DONE 0x23 /* PUT initiator about to send PUT_DONE */ +#define RANAL_TX_PUTT_NAK 0x30 /* PUT target about to send PUT_NAK */ +#define RANAL_TX_PUTT_ACK 0x30 /* PUT target about to send PUT_ACK */ +#define RANAL_TX_PUTT_WAIT_DONE 0x31 /* PUT target waiting for PUT_DONE */ +#define RANAL_TX_GETI_REQ 0x40 /* GET initiator about to send GET_REQ */ +#define RANAL_TX_GETI_WAIT_DONE 0x41 /* GET initiator waiting for GET_DONE */ +#define RANAL_TX_GETT_NAK 0x50 /* GET target about to send PUT_NAK */ +#define RANAL_TX_GETT_RDMA 0x51 /* GET target waiting for RDMA to complete */ +#define RANAL_TX_GETT_DONE 0x52 /* GET target about to send GET_DONE */ + +typedef struct kra_conn +{ + struct kra_peer *rac_peer; /* owning peer */ + struct list_head rac_list; /* stash on peer's conn list */ + struct list_head rac_hashlist; /* stash in connection hash table */ + struct list_head rac_schedlist; /* queue for scheduler */ + struct list_head rac_fmaq; /* txs queued for FMA */ + struct list_head rac_rdmaq; /* txs awaiting RDMA completion */ + struct list_head rac_replyq; /* txs awaiting replies */ + __u64 rac_peer_incarnation; /* peer's unique connection stamp */ + __u64 rac_my_incarnation; /* my unique connection stamp */ + unsigned long rac_last_tx; /* when I last sent an FMA message */ + unsigned long rac_last_rx; /* when I last received an FMA messages */ + long rac_keepalive; /* keepalive interval */ + long rac_timeout; /* infer peer death on (last_rx + timout > now) */ + __u32 rac_cqid; /* my completion callback id (non-unique) */ + __u32 rac_tx_seq; /* tx msg sequence number */ + __u32 rac_rx_seq; /* rx msg sequence number */ + atomic_t rac_refcount; /* # users */ + unsigned int rac_close_sent; /* I've sent CLOSE */ + unsigned int rac_close_recvd; /* I've received CLOSE */ + unsigned int rac_closing; /* connection being torn down */ + unsigned int rac_scheduled; /* being attented to */ + spinlock_t rac_lock; /* serialise */ + kra_device_t *rac_device; /* which device */ + RAP_PVOID rac_rihandle; /* RA endpoint */ + kra_msg_t *rac_rxmsg; /* incoming message (FMA prefix) */ + kra_msg_t rac_msg; /* keepalive/CLOSE message buffer */ +} kra_conn_t; + +typedef struct kra_peer +{ + struct list_head rap_list; /* stash on global peer list */ + struct list_head rap_connd_list; /* schedule on kra_connd_peers */ + struct list_head rap_conns; /* all active connections */ + struct list_head rap_tx_queue; /* msgs waiting for a conn */ + ptl_nid_t rap_nid; /* who's on the other end(s) */ + __u32 rap_ip; /* IP address of peer */ + int rap_port; /* port on which peer listens */ + atomic_t rap_refcount; /* # users */ + int rap_persistence; /* "known" peer refs */ + int rap_connecting; /* connection forming */ + unsigned long rap_reconnect_time; /* CURRENT_TIME when reconnect OK */ + unsigned long rap_reconnect_interval; /* exponential backoff */ +} kra_peer_t; + + +extern lib_nal_t kranal_lib; +extern kra_data_t kranal_data; +extern kra_tunables_t kranal_tunables; + +static inline void +kranal_peer_addref(kra_peer_t *peer) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid); + LASSERT(atomic_read(&peer->rap_refcount) > 0); + atomic_inc(&peer->rap_refcount); +} + +static inline void +kranal_peer_decref(kra_peer_t *peer) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid); + LASSERT(atomic_read(&peer->rap_refcount) > 0); + if (atomic_dec_and_test(&peer->rap_refcount)) + __kranal_peer_decref(peer); +} + +static inline struct list_head * +kranal_nid2peerlist (ptl_nid_t nid) +{ + unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size; + + return (&kranal_data.kra_peers [hash]); +} + +static inline int +kranal_peer_active(kra_peer_t *peer) +{ + /* Am I in the peer hash table? */ + return (!list_empty(&peer->rap_list)); +} + +static inline void +kranal_conn_addref(kra_conn_t *conn) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid); + LASSERT(atomic_read(&conn->rac_refcount) > 0); + atomic_inc(&conn->rac_refcount); +} + +static inline void +kranal_conn_decref(kra_conn_t *conn) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid); + LASSERT(atomic_read(&conn->rac_refcount) > 0); + if (atomic_dec_and_test(&conn->rac_refcount)) + __kranal_conn_decref(conn); +} + +static inline struct list_head * +kranal_cqid2connlist (__u32 cqid) +{ + unsigned int hash = cqid % kranal_data.kra_conn_hash_size; + + return (&kranal_data.kra_conns [hash]); +} + +static inline kra_conn_t * +kranal_cqid2conn_locked (__u32 cqid) +{ + struct list_head conns = kranal_cqid2connlist(cqid); + struct list_head *tmp; + + list_for_each(tmp, conns) { + conn = list_entry(tmp, kra_conn_t, rac_hashlist); + + if (conn->rac_cqid == cqid) + return conn; + } + + return NULL; +} + +static inline int +kranal_tx_mapped (kra_tx_t *tx) +{ + return (tx->tx_buftype == RANAL_BUF_VIRT_MAPPED || + tx->tx_buftype == RANAL_BUF_PHYS_MAPPED); +} + +#if CONFIG_X86 +static inline __u64 +kranal_page2phys (struct page *p) +{ + __u64 page_number = p - mem_map; + + return (page_number << PAGE_SHIFT); +} +#else +# error "no page->phys" +#endif + diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c new file mode 100644 index 0000000..b491d71 --- /dev/null +++ b/lnet/klnds/ralnd/ralnd_cb.c @@ -0,0 +1,1754 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2004 Cluster File Systems, Inc. + * Author: Eric Barton + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include "ranal.h" + +int +kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist) +{ + /* I would guess that if kranal_get_peer (nid) == NULL, + and we're not routing, then 'nid' is very distant :) */ + if ( nal->libnal_ni.ni_pid.nid == nid ) { + *dist = 0; + } else { + *dist = 1; + } + + return 0; +} + +void +kranal_device_callback(RAP_INT32 devid) +{ + kra_device_t *dev; + int i; + + for (i = 0; i < kranal_data.kra_ndevs; i++) { + + dev = &kranal_data.kra_devices[i]; + if (dev->rad_id != devid) + continue; + + spin_lock_irqsave(&dev->rad_lock, flags); + + if (!dev->rad_ready) { + dev->rad_ready = 1; + wake_up(&dev->rad_waitq); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); + return; + } + + CWARN("callback for unknown device %d\n", devid); +} + +void +kranal_schedule_conn(kra_conn_t *conn) +{ + kra_device_t *dev = conn->rac_device; + unsigned long flags; + + spin_lock_irqsave(&dev->rad_lock, flags); + + if (!conn->rac_scheduled) { + kranal_conn_addref(conn); /* +1 ref for scheduler */ + conn->rac_scheduled = 1; + list_add_tail(&conn->rac_schedlist, &dev->rad_connq); + wake_up(&dev->rad_waitq); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); +} + +void +kranal_schedule_cqid (__u32 cqid) +{ + kra_conn_t *conn; + struct list_head *conns; + struct list_head *tmp; + + conns = kranal_cqid2connlist(cqid); + + read_lock(&kranal_data.kra_global_lock); + + conn = kranal_cqid2conn_locked(cqid); + + if (conn == NULL) + CWARN("no cqid %x\n", cqid); + else + kranal_schedule_conn(conn); + + read_unlock(&kranal_data.kra_global_lock); +} + +void +kranal_schedule_dev(kra_device_t *dev) +{ + kra_conn_t *conn; + struct list_head *conns; + struct list_head *tmp; + int i; + + /* Don't do this in IRQ context (servers may have 1000s of clients) */ + LASSERT (!in_interrupt()); + + CWARN("Scheduling ALL conns on device %d\n", dev->rad_id); + + for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { + + /* Drop the lock on each hash bucket to ensure we don't + * block anyone for too long at IRQ priority on another CPU */ + + read_lock(&kranal_data.kra_global_lock); + + conns = &kranal_data.kra_conns[i]; + + list_for_each (tmp, conns) { + conn = list_entry(tmp, kra_conn_t, rac_hashlist); + + if (conn->rac_device == dev) + kranal_schedule_conn(conn); + } + read_unlock(&kranal_data.kra_global_lock); + } +} + +void +kranal_tx_done (kra_tx_t *tx, int completion) +{ + ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL; + kra_device_t *dev; + unsigned long flags; + int i; + RAP_RETURN rrc; + + LASSERT (!in_interrupt()); + + switch (tx->tx_buftype) { + default: + LBUG(); + + case RANAL_BUF_NONE: + case RANAL_BUF_IMMEDIATE: + case RANAL_BUF_PHYS_UNMAPPED: + case RANAL_BUF_VIRT_UNMAPPED: + break; + + case RANAL_BUF_PHYS_MAPPED: + LASSERT (tx->tx_conn != NULL); + dev = tx->tx_con->rac_device; + rrc = RapkDeregisterMemory(dev->rad_handle, NULL, + dev->rad_ptag, &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + break; + + case RANAL_BUF_VIRT_MAPPED: + LASSERT (tx->tx_conn != NULL); + dev = tx->tx_con->rac_device; + rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer + dev->rad_ptag, &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + break; + } + + for (i = 0; i < 2; i++) { + /* tx may have up to 2 libmsgs to finalise */ + if (tx->tx_libmsg[i] == NULL) + continue; + + lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc); + tx->tx_libmsg[i] = NULL; + } + + tx->tx_buftype = RANAL_BUF_NONE; + tx->tx_msg.ram_type = RANAL_MSG_NONE; + tx->tx_conn = NULL; + + spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); + + if (tx->tx_isnblk) { + list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs); + } else { + list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs); + wake_up(&kranal_data.kra_idle_tx_waitq); + } + + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); +} + +kra_tx_t * +kranal_get_idle_tx (int may_block) +{ + unsigned long flags; + kra_tx_t *tx = NULL; + + for (;;) { + spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); + + /* "normal" descriptor is free */ + if (!list_empty(&kranal_data.kra_idle_txs)) { + tx = list_entry(kranal_data.kra_idle_txs.next, + kra_tx_t, tx_list); + break; + } + + if (!may_block) { + /* may dip into reserve pool */ + if (list_empty(&kranal_data.kra_idle_nblk_txs)) { + CERROR("reserved tx desc pool exhausted\n"); + break; + } + + tx = list_entry(kranal_data.kra_idle_nblk_txs.next, + kra_tx_t, tx_list); + break; + } + + /* block for idle tx */ + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + + wait_event(kranal_data.kra_idle_tx_waitq, + !list_empty(&kranal_data.kra_idle_txs)); + } + + if (tx != NULL) { + list_del(&tx->tx_list); + + /* Allocate a new completion cookie. It might not be + * needed, but we've got a lock right now... */ + tx->tx_cookie = kranal_data.kra_next_tx_cookie++; + + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE); + LASSERT (tx->tx_conn == NULL); + LASSERT (tx->tx_libmsg[0] == NULL); + LASSERT (tx->tx_libmsg[1] == NULL); + } + + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + + return tx; +} + +void +kranal_init_msg(kra_msg_t *msg, int type) +{ + msg->ram_magic = RANAL_MSG_MAGIC; + msg->ram_version = RANAL_MSG_VERSION; + msg->ram_type = type; + msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid; + /* ram_incarnation gets set when FMA is sent */ +} + +kra_tx_t +kranal_new_tx_msg (int may_block, int type) +{ + kra_tx_t *tx = kranal_get_idle_tx(may_block); + + if (tx == NULL) + return NULL; + + kranal_init_msg(&tx->tx_msg, type); + return tx; +} + +int +kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, + int offset, int nob) + +{ + LASSERT (nob > 0); + LASSERT (niov > 0); + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + niov--; + iov++; + LASSERT (niov > 0); + } + + if (nob > iov->iov_len - offset) { + CERROR("Can't handle multiple vaddr fragments\n"); + return -EMSGSIZE; + } + + tx->tx_bufftype = RANAL_BUF_IMMEDIATE; + tx->tx_nob = nob; + tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); + return 0; +} + +int +kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, + int offset, int nob) + +{ + LASSERT (nob > 0); + LASSERT (niov > 0); + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + niov--; + iov++; + LASSERT (niov > 0); + } + + if (nob > iov->iov_len - offset) { + CERROR("Can't handle multiple vaddr fragments\n"); + return -EMSGSIZE; + } + + tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED; + tx->tx_nob = nob; + tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); + return 0; +} + +int +kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, + int offset, int nob) +{ + RAP_PHYS_REGION *phys = tx->tx_phys; + int resid; + + CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob); + + LASSERT (nob > 0); + LASSERT (nkiov > 0); + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + nkiov--; + kiov++; + LASSERT (nkiov > 0); + } + + tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED; + tx->tx_nob = nob; + tx->tx_buffer = NULL; + tx->tx_phys_offset = kiov->kiov_offset + offset; + + phys->Address = kranal_page2phys(kiov->kiov_page); + phys->Length = PAGE_SIZE; + phys++; + + resid = nob - (kiov->kiov_len - offset); + while (resid > 0) { + kiov++; + nkiov--; + LASSERT (nkiov > 0); + + if (kiov->kiov_offset != 0 || + ((resid > PAGE_SIZE) && + kiov->kiov_len < PAGE_SIZE)) { + int i; + /* Can't have gaps */ + CERROR("Can't make payload contiguous in I/O VM:" + "page %d, offset %d, len %d \n", nphys, + kiov->kiov_offset, kiov->kiov_len); + + for (i = -nphys; i < nkiov; i++) { + CERROR("kiov[%d] %p +%d for %d\n", + i, kiov[i].kiov_page, + kiov[i].kiov_offset, kiov[i].kiov_len); + } + + return -EINVAL; + } + + if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) { + CERROR ("payload too big (%d)\n", phys - tx->tx_phys); + return -EMSGSIZE; + } + + phys->Address = kranal_page2phys(kiov->kiov_page); + phys->Length = PAGE_SIZE; + phys++; + + resid -= PAGE_SIZE; + } + + tx->tx_phys_npages = phys - tx->tx_phys; + return 0; +} + +static inline int +kranal_setup_buffer (kra_tx_t *tx, int niov, + struct iovec *iov, ptl_kiov_t *kiov, + int offset, int nob) +{ + LASSERT ((iov == NULL) != (kiov == NULL)); + + if (kiov != NULL) + return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob); + + return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob); +} + +void +kranal_map_buffer (kra_tx_t *tx) +{ + kra_conn_t *conn = tx->tx_conn; + kra_device_t *dev = conn->rac_device; + + switch (tx->tx_buftype) { + default: + + case RANAL_BUF_PHYS_UNMAPPED: + rrc = RapkRegisterPhys(conn->rac_device->rad_handle, + tx->tx_phys, tx->tx_phys_npages, + conn->rac_device->rad_ptag, + &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + tx->tx_buftype = RANAL_BUF_PHYS_MAPPED; + return; + + case RANAL_BUF_VIRT_UNMAPPED: + rrc = RapkRegisterMemory(conn->rac_device->rad_handle, + tx->tx_buffer, tx->tx_nob, + conn->rac_device->rad_ptag, + &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + tx->tx_buftype = RANAL_BUF_VIRT_MAPPED; + return; + } +} + +kra_conn_t * +kranal_find_conn_locked (kra_peer_t *peer) +{ + struct list_head *tmp; + + /* just return the first connection */ + list_for_each (tmp, &peer->rap_conns) { + return list_entry(tmp, kra_conn_t, rac_list); + } + + return NULL; +} + +void +kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx) +{ + unsigned long flags; + + tx->tx_conn = conn; + + spin_lock_irqsave(&conn->rac_lock, flags); + list_add_tail(&tx->tx_list, &conn->rac_fmaq); + tx->tx_qtime = jiffies; + spin_unlock_irqrestore(&conn->rac_lock, flags); + + kranal_schedule_conn(conn); +} + +void +kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) +{ + unsigned long flags; + kra_peer_t *peer; + kra_conn_t *conn; + unsigned long now; + rwlock_t *g_lock = &kranal_data.kra_global_lock; + + /* If I get here, I've committed to send, so I complete the tx with + * failure on any problems */ + + LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ + + read_lock(g_lock); + + peer = kranal_find_peer_locked(nid); + if (peer == NULL) { + read_unlock(g_lock); + kranal_tx_done(tx, -EHOSTUNREACH); + return; + } + + conn = kranal_find_conn_locked(peer); + if (conn != NULL) { + kranal_post_fma(conn, tx); + read_unlock(g_lock); + return; + } + + /* Making one or more connections; I'll need a write lock... */ + read_unlock(g_lock); + write_lock_irqsave(g_lock, flags); + + peer = kranal_find_peer_locked(nid); + if (peer == NULL) { + write_unlock_irqrestore(g_lock, flags); + kranal_tx_done(tx -EHOSTUNREACH); + return; + } + + conn = kranal_find_conn_locked(peer); + if (conn != NULL) { + /* Connection exists; queue message on it */ + kranal_post_fma(conn, tx); + write_unlock_irqrestore(g_lock, flags); + return; + } + + LASSERT (peer->rap_persistence > 0); + + if (!peer->rap_connecting) { + now = CURRENT_TIME; + if (now < peer->rap_reconnect_time) { + write_unlock_irqrestore(g_lock, flags); + kranal_tx_done(tx, -EHOSTUNREACH); + return; + } + + peer->rap_connecting = 1; + kranal_peer_addref(peer); /* extra ref for connd */ + + spin_lock(&kranal_data.kra_connd_lock); + + list_add_tail(&peer->rap_connd_list, + &kranal_data.kra_connd_peers); + wake_up(&kranal_data.kra_connd_waitq); + + spin_unlock(&kranal_data.kra_connd_lock); + } + + /* A connection is being established; queue the message... */ + list_add_tail(&tx->tx_list, &peer->rap_tx_queue); + + write_unlock_irqrestore(g_lock, flags); +} + +static void +kranal_rdma(kra_tx_t *tx, int type, + kra_rdma_desc_t *rard, int nob, __u64 cookie) +{ + kra_conn_t *conn = tx->tx_conn; + RAP_RETURN rrc; + + /* prep final completion message */ + kranal_init_msg(&tx->tx_msg, type); + tx->tx_msg.ram_u.completion.racm_cookie = cookie; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + LASSERT (nob <= rard->rard_nob); + + memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); + tx->tx_rdma_desc.SrcPtr = tx->tx_buffer; + tx->tx_rdma_desc.SrcKey = tx->tx_map_key; + tx->tx_rdma_desc.DstPtr = rard->rard_addr; + tx->tx_rdma_desc.DstKey = rard->rard_key; + tx->tx_rdma_desc.Length = nob; + tx->tx_rdma_desc.AppPtr = tx; + + if (nob == 0) { /* Immediate completion */ + kranal_post_fma(conn, tx); + return; + } + + rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc); + LASSERT (rrc == RAP_SUCCESS); + + spin_lock_irqsave(&conn->rac_lock, flags); + list_add_tail(&tx->tx_list, &conn->rac_rdmaq); + tx->tx_qtime = jiffies; + spin_unlock_irqrestore(&conn->rac_lock, flags); +} + +int +kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) +{ + __u32 nob_received = nob; + RAP_RETURN rrc; + + LASSERT (conn->rac_rxmsg != NULL); + + rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer, + &nob_received, sizeof(kra_msg_t)); + LASSERT (rrc == RAP_SUCCESS); + + conn->rac_rxmsg = NULL; + + if (nob_received != nob) { + CWARN("Expected %d immediate bytes but got %d\n", + nob, nob_received); + return -EPROTO; + } + + return 0; +} + +ptl_err_t +kranal_do_send (lib_nal_t *nal, + void *private, + lib_msg_t *libmsg, + ptl_hdr_t *hdr, + int type, + ptl_nid_t nid, + ptl_pid_t pid, + unsigned int niov, + struct iovec *iov, + ptl_kiov_t *kiov, + size_t offset, + size_t nob) +{ + kra_conn_t *conn; + kra_tx_t *tx; + + /* NB 'private' is different depending on what we're sending.... */ + + CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64 + " pid %d\n", nob, niov, nid , pid); + + LASSERT (nob == 0 || niov > 0); + LASSERT (niov <= PTL_MD_MAX_IOV); + + LASSERT (!in_interrupt()); + /* payload is either all vaddrs or all pages */ + LASSERT (!(kiov != NULL && iov != NULL)); + + switch(type) { + default: + LBUG(); + + case PTL_MSG_REPLY: { + /* reply's 'private' is the conn that received the GET_REQ */ + conn = private; + LASSERT (conn->rac_rxmsg != NULL); + + if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) { + if (nob > RANAL_MAX_IMMEDIATE) { + CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n", + nob, nid); + return PTL_FAIL; + } + break; /* RDMA not expected */ + } + + /* Incoming message consistent with immediate reply? */ + if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) { + CERROR("REPLY to "LPX64" bad msg type %x!!!\n", + nid, conn->rac_rxmsg->ram_type); + return PTL_FAIL; + } + + tx = kranal_get_idle_tx(0); + if (tx == NULL) + return PTL_FAIL; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_conn = conn; + tx->tx_libmsg[0] = libmsg; + + kranal_map_buffer(tx); + kranal_rdma(tx, RANAL_MSG_GET_DONE, + &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob, + &conn->rac_rxmsg->ram_u.getreq.ragm_cookie); + return PTL_OK; + } + + case PTL_MSG_GET: + if (kiov == NULL && /* not paged */ + nob <= RANAL_MAX_IMMEDIATE && /* small enough */ + nob <= kranal_tunables.kra_max_immediate) + break; /* send IMMEDIATE */ + + tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg); + if (tx->tx_libmsg[1] == NULL) { + CERROR("Can't create reply for GET to "LPX64"\n", nid); + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_libmsg[0] = libmsg; + tx->tx_msg.ram_u.get.ragm_hdr = *hdr; + /* rest of tx_msg is setup just before it is sent */ + kranal_launch_tx(tx, nid); + return PTL_OK + + case PTL_MSG_ACK: + LASSERT (nob == 0); + break; + + case PTL_MSG_PUT: + if (kiov == NULL && /* not paged */ + nob <= RANAL_MAX_IMMEDIATE && /* small enough */ + nob <= kranal_tunables.kra_max_immediate) + break; /* send IMMEDIATE */ + + tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_libmsg[0] = libmsg; + tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr; + /* rest of tx_msg is setup just before it is sent */ + kranal_launch_tx(tx, nid); + return PTL_OK; + } + + LASSERT (kiov == NULL); + LASSERT (nob <= RANAL_MAX_IMMEDIATE); + + tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK || + type == PTL_MSG_REPLY || + in_interrupt()), + RANAL_MSG_IMMEDIATE); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_msg.ram_u.immediate.raim_hdr = *hdr; + tx->tx_libmsg[0] = libmsg; + kranal_launch_tx(tx, nid); + return PTL_OK; +} + +ptl_err_t +kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, struct iovec *iov, + size_t offset, size_t len) +{ + return kranal_do_send(nal, private, cookie, + hdr, type, nid, pid, + niov, iov, NULL, + offset, len); +} + +ptl_err_t +kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t len) +{ + return kranal_do_send(nal, private, cookie, + hdr, type, nid, pid, + niov, NULL, kiov, + offset, len); +} + +ptl_err_t +kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg, + unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) +{ + kra_conn_t *conn = private; + kra_msg_t *rxmsg = conn->rac_rxmsg; + void *buffer; + int rc; + + LASSERT (mlen <= rlen); + LASSERT (!in_interrupt()); + /* Either all pages or all vaddrs */ + LASSERT (!(kiov != NULL && iov != NULL)); + + switch(rxmsg->ram_type) { + default: + LBUG(); + return PTL_FAIL; + + case RANAL_MSG_IMMEDIATE: + if (mlen == 0) { + buffer = NULL; + } else if (kiov != NULL) { + CERROR("Can't recv immediate into paged buffer\n"); + return PTL_FAIL; + } else { + LASSERT (niov > 0); + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + if (mlen > iov->iov_len - offset) { + CERROR("Can't handle immediate frags\n"); + return PTL_FAIL; + } + buffer = ((char *)iov->iov_base) + offset; + } + rc = kranal_consume_rxmsg(conn, buffer, mlen); + lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL); + return PTL_OK; + + case RANAL_MSG_GET_REQ: + /* If the GET matched, we've already handled it in + * kranal_do_send which is called to send the REPLY. We're + * only called here to complete the GET receive (if we needed + * it which we don't, but I digress...) */ + LASSERT (libmsg == NULL); + lib_finalize(nal, NULL, libmsg, PTL_OK); + return PTL_OK; + + case RANAL_MSG_PUT_REQ: + if (libmsg == NULL) { /* PUT didn't match... */ + lib_finalize(null, NULL, libmsg, PTL_OK); + return PTL_OK; + } + + tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + kranal_map_buffer(tx); + + tx->tx_msg.ram_u.putack.rapam_src_cookie = + conn->rac_rxmsg->ram_u.putreq.raprm_cookie; + tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie; + tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key; + tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer; + tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen; + + tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */ + + kranal_post_fma(conn, tx); + + /* flag matched by consuming rx message */ + kranal_consume_rxmsg(conn, NULL, 0); + return PTL_OK; + } +} + +ptl_err_t +kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) +{ + return kranal_recvmsg(nal, private, msg, niov, iov, NULL, + offset, mlen, rlen); +} + +ptl_err_t +kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg, + unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) +{ + return kranal_recvmsg(nal, private, msg, niov, NULL, kiov, + offset, mlen, rlen); +} + +int +kranal_thread_start (int(*fn)(void *arg), void *arg) +{ + long pid = kernel_thread(fn, arg, 0); + + if (pid < 0) + return(int)pid; + + atomic_inc(&kranal_data.kra_nthreads); + return 0; +} + +void +kranal_thread_fini (void) +{ + atomic_dec(&kranal_data.kra_nthreads); +} + +int +kranal_check_conn (kra_conn_t *conn) +{ + kra_tx_t *tx; + struct list_head *ttmp; + unsigned long flags; + long timeout; + unsigned long now = jiffies; + + if (!conn->rac_closing && + time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) { + /* not sent in a while; schedule conn so scheduler sends a keepalive */ + kranal_schedule_conn(conn); + } + + /* wait twice as long for CLOSE to be sure peer is dead */ + timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ; + + if (!conn->rac_close_recvd && + time_after_eq(now, conn->rac_last_rx + timeout)) { + CERROR("Nothing received from "LPX64" within %d seconds\n", + conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ); + return -ETIMEDOUT; + } + + if (conn->rac_closing) + return 0; + + /* Check the conn's queues are moving. These are "belt+braces" checks, + * in case of hardware/software errors that make this conn seem + * responsive even though it isn't progressing its message queues. */ + + spin_lock_irqsave(&conn->rac_lock, flags); + + list_for_each (ttmp, &conn->rac_fmaq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (time_after_eq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on fmaq for "LPX64" blocked %d seconds\n", + conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + list_for_each (ttmp, &conn->rac_rdmaq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (time_after_eq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n", + conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + list_for_each (ttmp, &conn->rac_replyq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (time_after_eq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on replyq for "LPX64" blocked %d seconds\n", + conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + spin_unlock_irqrestore(&conn->rac_lock, flags); + return 0; +} + +void +kranal_check_conns (int idx, unsigned long *min_timeoutp) +{ + struct list_head *conns = &kranal_data.kra_conns[idx]; + struct list_head *ctmp; + kra_conn_t *conn; + + again: + /* NB. We expect to check all the conns and not find any problems, so + * we just use a shared lock while we take a look... */ + read_lock(&kranal_data.kra_global_lock); + + list_for_each (ctmp, conns) { + conn = list_entry(ptmp, kra_conn_t, rac_hashlist); + + if (conn->rac_timeout < *min_timeoutp ) + *min_timeoutp = conn->rac_timeout; + if (conn->rac_keepalive < *min_timeoutp ) + *min_timeoutp = conn->rac_keepalive; + + rc = kranal_check_conn(conn); + if (rc == 0) + continue; + + kranal_conn_addref(conn); + read_unlock(&kranal_data.kra_global_lock); + + CERROR("Check on conn to "LPX64"failed: %d\n", + conn->rac_peer->rap_nid, rc); + + write_lock_irqsave(&kranal_data.kra_global_lock); + + if (!conn->rac_closing) + kranal_close_conn_locked(conn, -ETIMEDOUT); + else + kranal_terminate_conn_locked(conn); + + kranal_conn_decref(conn); + + /* start again now I've dropped the lock */ + goto again; + } + + read_unlock(&kranal_data.kra_global_lock); +} + +int +kranal_connd (void *arg) +{ + char name[16]; + wait_queue_t wait; + unsigned long flags; + kra_peer_t *peer; + int i; + + snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + + while (!kranal_data.kra_shutdown) { + /* Safe: kra_shutdown only set when quiescent */ + + if (!list_empty(&kranal_data.kra_connd_peers)) { + peer = list_entry(kranal_data.kra_connd_peers.next, + kra_peer_t, rap_connd_list); + + list_del_init(&peer->rap_connd_list); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + kranal_connect(peer); + kranal_put_peer(peer); + + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + continue; + } + + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&kranal_data.kra_connd_waitq, &wait); + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + schedule (); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_connd_waitq, &wait); + + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + } + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + kranal_thread_fini(); + return 0; +} + +void +kranal_update_reaper_timeout(long timeout) +{ + unsigned long flags; + + LASSERT (timeout > 0); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + if (timeout < kranal_data.kra_new_min_timeout) + kranal_data.kra_new_min_timeout = timeout; + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); +} + +int +kranal_reaper (void *arg) +{ + wait_queue_t wait; + unsigned long flags; + kra_conn_t *conn; + kra_peer_t *peer; + unsigned long flags; + long timeout; + int i; + int conn_entries = kranal_data.kra_conn_hash_size; + int conn_index = 0; + int base_index = conn_entries - 1; + unsigned long next_check_time = jiffies; + long next_min_timeout = MAX_SCHEDULE_TIMEOUT; + long current_min_timeout = 1; + + kportal_daemonize("kranal_reaper"); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + kranal_data.kra_new_min_timeout = 1; + + while (!kranal_data.kra_shutdown) { + + /* careful with the jiffy wrap... */ + timeout = (long)(next_check_time - jiffies); + if (timeout <= 0) { + + /* I wake up every 'p' seconds to check for + * timeouts on some more peers. I try to check + * every connection 'n' times within the global + * minimum of all keepalive and timeout intervals, + * to ensure I attend to every connection within + * (n+1)/n times its timeout intervals. */ + + const int p = 1; + const int n = 3; + unsigned long min_timeout; + int chunk; + + if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { + /* new min timeout set: restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + + if (kranal_data.kra_new_min_timeout < current_min_timeout) { + current_min_timeout = kranal_data.kra_new_min_timeout; + CWARN("Set new min timeout %ld\n", + current_min_timeout); + } + + kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; + } + min_timeout = current_min_timeout; + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, + flags); + + LASSERT (min_timeout > 0); + + /* Compute how many table entries to check now so I + * get round the whole table fast enough (NB I do + * this at fixed intervals of 'p' seconds) */ + chunk = conn_entries; + if (min_timeout > n * p) + chunk = (chunk * n * p) / min_timeout; + if (chunk == 0) + chunk = 1; + + for (i = 0; i < chunk; i++) { + kranal_check_conns(conn_index, + &next_min_timeout); + conn_index = (conn_index + 1) % conn_entries; + } + + next_check_time += p * HZ; + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + if (((conn_index - chunk <= base_index && + base_index < conn_index) || + (conn_index - conn_entries - chunk <= base_index && + base_index < conn_index - conn_entries))) { + + /* Scanned all conns: set current_min_timeout... */ + if (current_min_timeout != next_min_timeout) { + current_min_timeout = next_min_timeout; + CWARN("Set new min timeout %ld\n", + current_min_timeout); + } + + /* ...and restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + } + } + + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); + + busy_loops = 0; + schedule_timeout(timeout); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + } + + kranal_thread_fini(); + return 0; +} + +void +kranal_process_rdmaq (__u32 cqid) +{ + kra_conn_t *conn; + kra_tx_t *tx; + RAP_RETURN rrc; + unsigned long flags; + RAP_RDMA_DESCRIPTOR *desc; + + read_lock(&kranal_data.kra_global_lock); + + conn = kranal_cqid2conn_locked(cqid); + LASSERT (conn != NULL); + + rrc = RapkRdmaDone(conn->rac_rihandle, &desc); + LASSERT (rrc == RAP_SUCCESS); + + spin_lock_irqsave(&conn->rac_lock, flags); + + LASSERT (!list_empty(&conn->rac_rdmaq)); + tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list); + list_del(&tx->tx_list); + + LASSERT(desc->AppPtr == (void *)tx); + LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE || + desc->tx_msg.ram_type == RANAL_MSG_GET_DONE); + + list_add_tail(&tx->tx_list, &conn->rac_fmaq); + tx->tx_qtime = jiffies; + + spin_unlock_irqrestore(&conn->rac_lock, flags); + + /* Get conn's fmaq processed, now I've just put something there */ + kranal_schedule_conn(conn); + + read_unlock(&kranal_data.kra_global_lock); +} + +int +kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, + void *immediate, int immediatenob) +{ + int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0; + + LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN); + LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ? + immediatenob <= RANAL_FMA_MAX_DATA_LEN : + immediatenob == 0); + + msg->ram_incarnation = conn->rac_incarnation; + msg->ram_seq = conn->rac_tx_seq; + + if (sync) + rrc = RapkFmaSyncSend(conn->rac_device.rad_handle, + immediate, immediatenob, + msg, sizeof(*msg)); + else + rrc = RapkFmaSend(conn->rac_device.rad_handle, + immediate, immediatenob, + msg, sizeof(*msg)); + + switch (rrc) { + case RAP_SUCCESS: + conn->rac_last_tx = jiffies; + conn->rac_tx_seq++; + return 0; + + case RAP_NOT_DONE: + return -EAGAIN; + + default: + LBUG(); + } +} + +int +kranal_process_fmaq (kra_conn_t *conn) +{ + unsigned long flags; + int more_to_do; + kra_tx_t *tx; + int rc; + int expect_reply; + + /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is + * out of credits when I try to send right now... */ + + if (conn->rac_closing) { + + if (!list_empty(&conn->rac_rdmaq)) { + /* Can't send CLOSE yet; I'm still waiting for RDMAs I + * posted to finish */ + LASSERT (!conn->rac_close_sent); + kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); + kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + return 0; + } + + if (conn->rac_close_sent) + return 0; + + kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE); + rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + conn->rac_close_sent = (rc == 0); + return 0; + } + + spin_lock_irqsave(&conn->rac_lock, flags); + + if (list_empty(&conn->rac_fmaq)) { + + spin_unlock_irqrestore(&conn->rac_lock, flags); + + if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) { + kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); + kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + } + return 0; + } + + tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); + list_del(&tx->tx_list); + more_to_do = !list_empty(&conn->rac_fmaq); + + spin_unlock_irqrestore(&conn->rac_lock, flags); + + expect_reply = 0; + switch (tx->tx_msg.ram_type) { + default: + LBUG(); + + case RANAL_MSG_IMMEDIATE: + case RANAL_MSG_PUT_NAK: + case RANAL_MSG_PUT_DONE: + case RANAL_MSG_GET_NAK: + case RANAL_MSG_GET_DONE: + rc = kranal_sendmsg(conn, &tx->tx_msg, + tx->tx_buffer, tx->tx_nob); + expect_reply = 0; + break; + + case RANAL_MSG_PUT_REQ: + tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie; + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); + kranal_map_buffer(tx); + expect_reply = 1; + break; + + case RANAL_MSG_PUT_ACK: + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); + expect_reply = 1; + break; + + case RANAL_MSG_GET_REQ: + kranal_map_buffer(tx); + tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie; + tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key; + tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer; + tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob; + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); + expect_reply = 1; + break; + } + + if (rc == -EAGAIN) { + /* replace at the head of the list for later */ + spin_lock_irqsave(&conn->rac_lock, flags); + list_add(&tx->tx_list, &conn->rac_fmaq); + spin_unlock_irqrestore(&conn->rac_lock, flags); + + return 0; + } + + LASSERT (rc == 0); + + if (!expect_reply) { + kranal_tx_done(tx, 0); + } else { + spin_lock_irqsave(&conn->rac_lock, flags); + list_add_tail(&tx->tx_list, &conn->rac_replyq); + tx->tx_qtime = jiffies; + spin_unlock_irqrestore(&conn->rac_lock, flags); + } + + return more_to_do; +} + +static inline void +kranal_swab_rdma_desc (kra_rdma_desc_t *d) +{ + __swab64s(&d->rard_key.Key); + __swab16s(&d->rard_key.Cookie); + __swab16s(&d->rard_key.MdHandle); + __swab32s(&d->rard_key.Flags); + __swab64s(&d->rard_addr); + __swab32s(&d->rard_nob); +} + +kra_tx_t * +kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie) +{ + unsigned long flags; + struct list_head *ttmp; + kra_tx_t *tx; + + list_for_each(ttmp, &conn->rac_replyq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (tx->tx_cookie != cookie) + continue; + + if (tx->tx_msg.ram_type != type) { + CWARN("Unexpected type %x (%x expected) " + "matched reply from "LPX64"\n", + tx->tx_msg.ram_type, type, + conn->rac_peer->rap_nid); + return NULL; + } + } + + CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid); + return NULL; +} + +int +kranal_process_receives(kra_conn_t *conn) +{ + unsigned long flags; + __u32 seq; + __u32 nob; + kra_msg_t *msg; + RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg); + kra_peer_t *peer = conn->rac_peer; + + if (rrc == RAP_NOT_DONE) + return 0; + + LASSERT (rrc == RAP_SUCCESS); + conn->rac_last_rx = jiffies; + seq = conn->rac_seq++; + + if (msg->ram_magic != RANAL_MSG_MAGIC) { + if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) { + CERROR("Unexpected magic %08x from "LPX64"\n", + msg->ram_magic, peer->rap_nid); + goto out; + } + + __swab32s(&msg->ram_magic); + __swab16s(&msg->ram_version); + __swab16s(&msg->ram_type); + __swab64s(&msg->ram_srcnid); + __swab64s(&msg->ram_incarnation); + __swab32s(&msg->ram_seq); + + /* NB message type checked below; NOT here... */ + switch (msg->ram_type) { + case RANAL_MSG_PUT_ACK: + kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc); + break; + + case RANAL_MSG_GET_REQ: + kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc); + break; + + default: + break; + } + } + + if (msg->ram_version != RANAL_MSG_VERSION) { + CERROR("Unexpected protocol version %d from "LPX64"\n", + msg->ram_version, peer->rap_nid); + goto out; + } + + if (msg->ram_srcnid != peer->rap_nid) { + CERROR("Unexpected peer "LPX64" from "LPX64"\n", + msg->ram_srcnid, peer->rap_nid); + goto out; + } + + if (msg->ram_incarnation != conn->rac_incarnation) { + CERROR("Unexpected incarnation "LPX64"("LPX64 + " expected) from "LPX64"\n", + msg->ram_incarnation, conn->rac_incarnation, + peer->rap_nid); + goto out; + } + + if (msg->ram_seq != seq) { + CERROR("Unexpected sequence number %d(%d expected) from " + LPX64"\n", msg->ram_seq, seq, peer->rap_nid); + goto out; + } + + if ((msg->ram_type & RANAL_MSG_FENCE) != 0) { + /* This message signals RDMA completion: wait now... */ + rrc = RapkFmaSyncWait(conn->rac_rihandle); + LASSERT (rrc == RAP_SUCCESS); + } + + if (msg->ram_type == RANAL_MSG_CLOSE) { + conn->rac_close_recvd = 1; + write_lock_irqsave(&kranal_data.kra_global_lock); + + if (!conn->rac_closing) + kranal_close_conn_locked(conn, -ETIMEDOUT); + else if (conn->rac_close_sent) + kranal_terminate_conn_locked(conn); + + goto out; + } + + if (conn->rac_closing) + goto out; + + conn->rac_rxmsg = msg; /* stash message for portals callbacks */ + /* they'll NULL rac_rxmsg if they consume it */ + switch (msg->ram_type) { + case RANAL_MSG_NOOP: + /* Nothing to do; just a keepalive */ + break; + + case RANAL_MSG_IMMEDIATE: + lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn); + break; + + case RANAL_MSG_PUT_REQ: + lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn); + + if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ + break; + + tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK); + if (tx == NULL) + break; + + tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie; + kranal_post_fma(conn, tx); + break; + + case RANAL_MSG_PUT_NAK: + tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, -ENOENT); /* no match */ + break; + + case RANAL_MSG_PUT_ACK: + tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, + msg->ram_u.putack.rapam_src_cookie); + if (tx == NULL) + break; + + kranal_rdma(tx, RANAL_MSG_PUT_DONE, + &msg->ram_u.putack.rapam_desc, + msg->msg_u.putack.rapam_desc.rard_nob, + msg->ram_u.putack.rapam_dst_cookie); + break; + + case RANAL_MSG_PUT_DONE: + tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, 0); + break; + + case RANAL_MSG_GET_REQ: + lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn); + + if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ + break; + + tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK); + if (tx == NULL) + break; + + tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie; + kranal_post_fma(conn, tx); + break; + + case RANAL_MSG_GET_NAK: + tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, -ENOENT); /* no match */ + break; + + case RANAL_MSG_GET_DONE: + tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, 0); + break; + } + + out: + if (conn->rac_msg != NULL) + kranal_consume_rxmsg(conn, NULL, 0); + + return 1; +} + +int +kranal_scheduler (void *arg) +{ + kra_device_t *dev = (kra_device_t *)arg; + wait_queue_t wait; + char name[16]; + kra_conn_t *conn; + unsigned long flags; + int rc; + int i; + __u32 cqid; + int did_something; + int busy_loops = 0; + + snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + spin_lock_irqsave(&dev->rad_lock, flags); + + while (!kranal_data.kra_shutdown) { + /* Safe: kra_shutdown only set when quiescent */ + + if (busy_loops++ >= RANAL_RESCHED) { + spin_unlock_irqrestore(&dev->rad_lock, flags); + + our_cond_resched(); + busy_loops = 0; + + spin_lock_irqsave(&dev->rad_lock, flags); + } + + did_something = 0; + + if (dev->rad_ready) { + dev->rad_ready = 0; + spin_unlock_irqrestore(&dev->rad_lock, flags); + + rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type); + + LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE); + LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); + + if (rrc == RAP_SUCCESS) { + kranal_process_rdmaq(cqid); + did_something = 1; + } + + rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type); + LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE); + + if (rrc == RAP_SUCCESS) { + if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0) + kranal_schedule_dev(dev); + else + kranal_schedule_cqid(cqid); + did_something = 1; + } + + spin_lock_irqsave(&dev->rad_lock, flags); + + /* If there were no completions to handle, I leave + * rad_ready clear. NB I cleared it BEFORE I checked + * the completion queues since I'm racing with the + * device callback. */ + + if (did_something) + dev->rad_ready = 1; + } + + if (!list_empty(&dev->rad_connq)) { + conn = list_entry(dev->rad_connq.next, + kra_conn_t, rac_schedlist); + list_del(&conn->rac_schedlist); + spin_unlock_irqrestore(&dev->rad_lock, flags); + + LASSERT (conn->rac_scheduled); + + resched = kranal_process_fmaq(conn); + resched |= kranal_process_receives(conn); + did_something = 1; + + spin_lock_irqsave(&dev->rad_lock, flags); + if (resched) + list_add_tail(&conn->rac_schedlist, + &dev->rad_connq); + } + + if (did_something) + continue; + + add_wait_queue(&dev->rad_waitq, &wait); + set_current_state(TASK_INTERRUPTIBLE); + + spin_unlock_irqrestore(&dev->rad_lock, flags); + + busy_loops = 0; + schedule(); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&dev->rad_waitq, &wait); + + spin_lock_irqsave(&dev->rad_lock, flags); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); + + kranal_thread_fini(); + return 0; +} + + +lib_nal_t kranal_lib = { + libnal_data: &kranal_data, /* NAL private data */ + libnal_send: kranal_send, + libnal_send_pages: kranal_send_pages, + libnal_recv: kranal_recv, + libnal_recv_pages: kranal_recv_pages, + libnal_dist: kranal_dist +}; diff --git a/lustre/portals/knals/ranal/.cvsignore b/lustre/portals/knals/ranal/.cvsignore new file mode 100644 index 0000000..5ed596b --- /dev/null +++ b/lustre/portals/knals/ranal/.cvsignore @@ -0,0 +1,10 @@ +.deps +Makefile +.*.cmd +autoMakefile.in +autoMakefile +*.ko +*.mod.c +.*.flags +.tmp_versions +.depend diff --git a/lustre/portals/knals/ranal/Makefile.in b/lustre/portals/knals/ranal/Makefile.in new file mode 100644 index 0000000..1772cc2 --- /dev/null +++ b/lustre/portals/knals/ranal/Makefile.in @@ -0,0 +1,6 @@ +MODULES := kranal +kranal-objs := ranal.o ranal_cb.o + +EXTRA_POST_CFLAGS := @RACPPFLAGS@ + +@INCLUDE_RULES@ diff --git a/lustre/portals/knals/ranal/autoMakefile.am b/lustre/portals/knals/ranal/autoMakefile.am new file mode 100644 index 0000000..f136aa5 --- /dev/null +++ b/lustre/portals/knals/ranal/autoMakefile.am @@ -0,0 +1,15 @@ +# Copyright (C) 2001 Cluster File Systems, Inc. +# +# This code is issued under the GNU General Public License. +# See the file COPYING in this distribution + +if MODULES +if !CRAY_PORTALS +if BUILD_RANAL +modulenet_DATA = kranal$(KMODEXT) +endif +endif +endif + +MOSTLYCLEANFILES = *.o *.ko *.mod.c +DIST_SOURCES = $(kranal-objs:%.o=%.c) ranal.h diff --git a/lustre/portals/knals/ranal/ranal.c b/lustre/portals/knals/ranal/ranal.c new file mode 100644 index 0000000..a59757d --- /dev/null +++ b/lustre/portals/knals/ranal/ranal.c @@ -0,0 +1,1978 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2004 Cluster File Systems, Inc. + * Author: Eric Barton + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ +#include "ranal.h" + + +nal_t kranal_api; +ptl_handle_ni_t kranal_ni; +kra_data_t kranal_data; +kra_tunables_t kranal_tunables; + +#ifdef CONFIG_SYSCTL +#define RANAL_SYSCTL_TIMEOUT 1 +#define RANAL_SYSCTL_LISTENER_TIMEOUT 2 +#define RANAL_SYSCTL_BACKLOG 3 +#define RANAL_SYSCTL_PORT 4 +#define RANAL_SYSCTL_MAX_IMMEDIATE 5 + +#define RANAL_SYSCTL 202 + +static ctl_table kranal_ctl_table[] = { + {RANAL_SYSCTL_TIMEOUT, "timeout", + &kranal_tunables.kra_timeout, sizeof(int), + 0644, NULL, &proc_dointvec}, + {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", + &kranal_tunables.kra_listener_timeout, sizeof(int), + 0644, NULL, &proc_dointvec}, + {RANAL_SYSCTL_BACKLOG, "backlog", + &kranal_tunables.kra_backlog, sizeof(int), + 0644, NULL, kranal_listener_procint}, + {RANAL_SYSCTL_PORT, "port", + &kranal_tunables.kra_port, sizeof(int), + 0644, NULL, kranal_listener_procint}, + {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", + &kranal_tunables.kra_max_immediate, sizeof(int), + 0644, NULL, &proc_dointvec}, + { 0 } +}; + +static ctl_table kranal_top_ctl_table[] = { + {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table}, + { 0 } +}; +#endif + +int +kranal_sock_write (struct socket *sock, void *buffer, int nob) +{ + int rc; + mm_segment_t oldmm = get_fs(); + struct iovec iov = { + .iov_base = buffer, + .iov_len = nob + }; + struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = MSG_DONTWAIT + }; + + /* We've set up the socket's send buffer to be large enough for + * everything we send, so a single non-blocking send should + * complete without error. */ + + set_fs(KERNEL_DS); + rc = sock_sendmsg(sock, &msg, iov.iov_len); + set_fs(oldmm); + + return rc; +} + +int +kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) +{ + int rc; + mm_segment_t oldmm = get_fs(); + long ticks = timeout * HZ; + unsigned long then; + struct timeval tv; + + LASSERT (nob > 0); + LASSERT (ticks > 0); + + for (;;) { + struct iovec iov = { + .iov_base = buffer, + .iov_len = nob + }; + struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = 0 + }; + + /* Set receive timeout to remaining time */ + tv = (struct timeval) { + .tv_sec = ticks / HZ, + .tv_usec = ((ticks % HZ) * 1000000) / HZ; + }; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, + (char *)&tv, sizeof(tv)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set socket recv timeout %d: %d\n", + send_timeout, rc); + return rc; + } + + set_fs(KERNEL_DS); + then = jiffies; + rc = sock_recvmsg(sock, &msg, iov.iov_len, 0); + ticks -= jiffies - then; + set_fs(oldmm); + + if (rc < 0) + return rc; + + if (rc == 0) + return -ECONNABORTED; + + buffer = ((char *)buffer) + rc; + nob -= rc; + + if (nob == 0) + return 0; + + if (ticks <= 0) + return -ETIMEDOUT; + } +} + +int +kranal_create_sock(struct socket **sockp) +{ + struct socket *sock; + int rc; + struct timeval tv; + int option; + mm_segment_t oldmm = get_fs(); + + rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock); + if (rc != 0) { + CERROR("Can't create socket: %d\n", rc); + return rc; + } + + /* Ensure sending connection info doesn't block */ + option = 2 * sizeof(kra_connreq_t); + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set send buffer %d: %d\n", option, rc); + goto failed; + } + + option = 1; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set SO_REUSEADDR: %d\n", rc); + goto failed; + } + + *sockp = sock; + return 0; + + failed: + sock_release(sock); + return rc; +} + +void +kranal_pause(int ticks) +{ + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(ticks); +} + +void +kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) +{ + memset(connreq, 0, sizeof(*connreq)); + + connreq->racr_magic = RANAL_MSG_MAGIC; + connreq->racr_version = RANAL_MSG_VERSION; + connreq->racr_devid = conn->rac_device->rad_id; + connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_timeout = conn->rac_timeout; + connreq->racr_incarnation = conn->rac_my_incarnation; + + rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams); + LASSERT(rrc == RAP_SUCCESS); +} + +int +kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout) +{ + int i; + int rc; + + rc = kranal_sock_read(newsock, connreq, sizeof(*connreq), timeout); + if (rc != 0) { + CERROR("Read failed: %d\n", rc); + return rc; + } + + if (connreq->racr_magic != RANAL_MSG_MAGIC) { + if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) { + CERROR("Unexpected magic %08x\n", connreq->racr_magic); + return -EPROTO; + } + + __swab32s(&connreq->racr_magic); + __swab16s(&connreq->racr_version); + __swab16s(&connreq->racr_devid); + __swab64s(&connreq->racr_nid); + __swab64s(&connreq->racr_incarnation); + __swab32s(&connreq->racr_timeout); + + __swab32s(&connreq->racr_riparams.FmaDomainHndl); + __swab32s(&connreq->racr_riparams.RcvCqHndl); + __swab32s(&connreq->racr_riparams.PTag); + __swab32s(&connreq->racr_riparams.CompletionCookie); + } + + if (connreq->racr_version != RANAL_MSG_VERSION) { + CERROR("Unexpected version %d\n", connreq->racr_version); + return -EPROTO; + } + + if (connreq->racr_nid == PTL_NID_ANY) { + CERROR("Received PTL_NID_ANY\n"); + return -EPROTO; + } + + if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) { + CERROR("Received timeout %d < MIN %d\n", + connreq->racr_timeout, RANAL_MIN_TIMEOUT); + return -EPROTO; + } + + for (i = 0; i < kranal_data.kra_ndevs; i++) + if (connreq->racr_devid == + kranal_data.kra_devices[i]->rad_id) + break; + + if (i == kranal_data.kra_ndevs) { + CERROR("Can't match device %d\n", connreq->racr_devid); + return -ENODEV; + } + + return 0; +} + +int +kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation) +{ + kra_conn_t *conn; + struct list_head *tmp; + int loopback = 0; + + list_for_each(tmp, &peer->rap_conns) { + conn = list_entry(tmp, kra_conn_t, rac_list); + + if (conn->rac_incarnation < incarnation) { + /* Conns with an older incarnation get culled later */ + continue; + } + + if (!loopback && + conn->rac_incarnation == incarnation && + peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) { + /* loopback creates 2 conns */ + loopback = 1; + continue; + } + + return 1; + } + + return 0; +} + +void +kranal_set_conn_uniqueness (kra_conn_t *conn) +{ + unsigned long flags; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + conn->rac_my_incarnation = kranal_data.kra_next_incarnation++; + + do { /* allocate a unique cqid */ + conn->rac_cqid = kranal_data.kra_next_cqid++; + } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL) + + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); +} + +int +kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev) +{ + kra_conn_t *conn; + RAP_RETURN rrc; + + LASSERT (!in_interrupt()); + PORTAL_ALLOC(conn, sizeof(*conn)); + + if (conn == NULL) + return -ENOMEM; + + memset(conn, 0, sizeof(*conn)); + conn->rac_cqid = cqid; + atomic_set(&conn->rac_refcount, 1); + INIT_LIST_HEAD(&conn->rac_list); + INIT_LIST_HEAD(&conn->rac_hashlist); + INIT_LIST_HEAD(&conn->rac_fmaq); + INIT_LIST_HEAD(&conn->rac_rdmaq); + INIT_LIST_HEAD(&conn->rac_replyq); + spin_lock_init(&conn->rac_lock); + + conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT); + kranal_update_reaper_timeout(conn->rac_timeout); + + rrc = RapkCreateRi(dev->rad_handle, cqid, dev->rad_ptag, + dev->rad_rdma_cq, dev->rad_fma_cq, + &conn->rac_rihandle); + if (rrc != RAP_SUCCESS) { + CERROR("RapkCreateRi failed: %d\n", rrc); + PORTAL_FREE(conn, sizeof(*conn)); + return -ENETDOWN; + } + + atomic_inc(&kranal_data.kra_nconns); + *connp = conn; + return 0; +} + +void +__kranal_conn_decref(kra_conn_t *conn) +{ + kra_tx_t *tx; + RAP_RETURN rrc; + + LASSERT (!in_interrupt()); + LASSERT (!conn->rac_scheduled); + LASSERT (list_empty(&conn->rac_list)); + LASSERT (list_empty(&conn->rac_hashlist)); + LASSERT (atomic_read(&conn->rac_refcount) == 0); + + while (!list_empty(&conn->rac_fmaq)) { + tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -ECONNABORTED); + } + + /* We may not destroy this connection while it has RDMAs outstanding */ + LASSERT (list_empty(&conn->rac_rdmaq)); + + while (!list_empty(&conn->rac_replyq)) { + tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -ECONNABORTED); + } + + rrc = RapkDestroyRi(conn->rac_device->rad_handle, + conn->rac_rihandle); + LASSERT (rrc == RAP_SUCCESS); + + if (conn->rac_peer != NULL) + kranal_peer_decref(conn->rac_peer); + + PORTAL_FREE(conn, sizeof(*conn)); + atomic_dec(&kranal_data.kra_nconns); +} + +void +kranal_terminate_conn_locked (kra_conn_t *conn) +{ + kra_peer_t *peer - conn->rac_peer; + + LASSERT (!in_interrupt()); + LASSERT (conn->rac_closing); + LASSERT (!list_empty(&conn->rac_hashlist)); + LASSERT (list_empty(&conn->rac_list)); + + /* Remove from conn hash table (no new callbacks) */ + list_del_init(&conn->rac_hashlist); + kranal_conn_decref(conn); + + /* Conn is now just waiting for remaining refs to go */ +} + +void +kranal_close_conn_locked (kra_conn_t *conn, int error) +{ + kra_peer_t *peer = conn->rac_peer; + + CDEBUG(error == 0 ? D_NET : D_ERROR, + "closing conn to "LPX64": error %d\n", peer->rap_nid, error); + + LASSERT (!in_interrupt()); + LASSERT (!conn->rac_closing); + LASSERT (!list_empty(&conn->rac_hashlist)); + LASSERT (!list_empty(&conn->rac_list)); + + list_del_init(&conn->rac_list); + + if (list_empty(&peer->rap_conns) && + peer->rap_persistence == 0) { + /* Non-persistent peer with no more conns... */ + kranal_unlink_peer_locked(peer); + } + + conn->rac_closing = 1; + kranal_schedule_conn(conn); + + kranal_conn_decref(conn); /* lose peer's ref */ +} + +void +kranal_close_conn (kra_conn_t *conn, int error) +{ + unsigned long flags; + + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (!conn->rac_closing) + kranal_close_conn_locked(conn, error); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); +} + +int +kranal_passive_conn_handshake (struct socket *sock, + ptl_nid_t **peer_nidp, kra_conn_t **connp) +{ + struct sockaddr_in addr; + __u32 peer_ip; + unsigned int peer_port; + kra_connreq_t connreq; + ptl_nid_t peer_nid; + kra_conn_t *conn; + kra_device_t *dev; + RAP_RETURN rrc; + int rc; + int i; + + rc = sock->ops->getname(newsock, (struct sockaddr *)addr, &len, 2); + if (rc != 0) { + CERROR("Can't get peer's IP: %d\n", rc); + return rc; + } + + peer_ip = ntohl(sin.sin_addr.s_addr); + peer_port = ntohs(sin.sin_port); + + if (peer_port >= 1024) { + CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n", + HIPQUAD(peer_ip), peer_port); + return -ECONNREFUSED; + } + + rc = kranal_recv_connreq(sock, &connreq, + kranal_data.kra_listener_timeout); + if (rc != 0) { + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer_ip), peer_port, rc); + return rc; + } + + peer_nid = connreq.racr_nid; + LASSERT (peer_nid != PTL_NID_ANY); + + for (i = 0;;i++) { + LASSERT(i < kranal_data.kra_ndevs); + dev = &kranal_data.kra_devices[i]; + if (dev->rad_id == connreq->racr_devid) + break; + } + + rc = kranal_alloc_conn(&conn, dev,(__u32)(peer_nid & 0xffffffff)); + if (rc != 0) + return rc; + + conn->rac_peer_incarnation = connreq.racr_incarnation; + conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); + kranal_update_reaper_timeout(conn->rac_keepalive); + + rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams); + if (rrc != RAP_SUCCESS) { + CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc); + kranal_conn_decref(conn); + return -EPROTO; + } + + kranal_pack_connreq(&connreq, conn); + + rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); + if (rc != 0) { + CERROR("Can't tx connreq to %u.%u.%u.%u/%p: %d\n", + HIPQUAD(peer_ip), peer_port, rc); + kranal_conn_decref(conn); + return rc; + } + + *connp = conn; + *peer_nidp = peer_nid; + return 0; +} + +int +ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) +{ + struct sockaddr_in locaddr; + struct sockaddr_in srvaddr; + struct socket *sock; + unsigned int port; + int rc; + int option; + mm_segment_t oldmm = get_fs(); + struct timeval tv; + + for (port = 1023; port >= 512; port--) { + + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; + locaddr.sin_port = htons(port); + locaddr.sin_addr.s_addr = htonl(INADDR_ANY); + + memset (&srvaddr, 0, sizeof (srvaddr)); + srvaddr.sin_family = AF_INET; + srvaddr.sin_port = htons (peer->rap_port); + srvaddr.sin_addr.s_addr = htonl (peer->rap_ip); + + rc = kranal_create_sock(&sock); + if (rc != 0) + return rc; + + rc = sock->ops->bind(sock, + (struct sockaddr *)&locaddr, sizeof(locaddr)); + if (rc != 0) { + sock_release(sock); + + if (rc == -EADDRINUSE) { + CDEBUG(D_NET, "Port %d already in use\n", port); + continue; + } + + CERROR("Can't bind to reserved port %d: %d\n", port, rc); + return rc; + } + + rc = sock->ops->connect(sock, + (struct sockaddr *)&srvaddr, sizeof(srvaddr), + 0); + if (rc == 0) { + *sockp = sock; + return 0; + } + + sock_release(sock); + + if (rc != -EADDRNOTAVAIL) { + CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n", + port, HIPQUAD(peer->rap_ip), peer->rap_port, rc); + return rc; + } + + CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", + port, HIPQUAD(peer->rap_ip), peer->rap_port); + } +} + + +int +kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) +{ + kra_connreq_t connreq; + kra_conn_t *conn; + kra_device_t *dev; + struct socket *sock; + __u32 id32; + RAP_RETURN rrc; + int rc; + + id32 = (peer_nid & 0xffffffff); + dev = &kranal_data.kra_devices[id32 % kranal_data.kra_ndevs]; + + rc = kranal_alloc_conn(&conn, dev, id32); + if (rc != 0) + return rc; + + kranal_pack_connreq(&connreq, conn); + + memset(&dstaddr, 0, sizeof(addr)); + dstaddr.sin_family = AF_INET; + dstaddr.sin_port = htons(peer->rap_port); + dstaddr.sin_addr.s_addr = htonl(peer->rap_ip); + + memset(&srcaddr, 0, sizeof(addr)); + + rc = ranal_connect_sock(peer, &sock); + if (rc != 0) + goto failed_0; + + /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout + * immediately after accepting a connection, so we connect and then + * send immediately. */ + + rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); + if (rc != 0) { + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer->rap_ip), peer->rap_port, rc); + goto failed_1; + } + + rc = kranal_recv_connreq(sock, &connreq, kranal_data.kra_timeout); + if (rc != 0) { + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer->rap_ip), peer->rap_port, rc); + goto failed_1; + } + + sock_release(sock); + rc = -EPROTO; + + if (connreq.racr_nid != peer->rap_nid) { + CERROR("Unexpected nid from %u.%u.%u.%u/%d: " + "received "LPX64" expected "LPX64"\n", + HIPQUAD(peer->rap_ip), peer->rap_port, + connreq.racr_nid, peer->rap_nid); + goto failed_0; + } + + if (connreq.racr_devid != dev->rad_id) { + CERROR("Unexpected device id from %u.%u.%u.%u/%d: " + "received %d expected %d\n", + HIPQUAD(peer->rap_ip), peer->rap_port, + connreq.racr_devid, dev->rad_id); + goto failed_0; + } + + conn->rac_peer_incarnation = connreq.racr_incarnation; + conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); + kranal_update_reaper_timeout(conn->rac_keepalive); + + rc = -ENETDOWN; + rrc = RapkSetRiParams(conn->rac_rihandle, + &connreq->racr_riparams); + if (rrc != RAP_SUCCESS) { + CERROR("Can't set riparams for "LPX64": %d\n", + peer_nid, rrc); + goto failed_0; + } + + *connp = conn; + return 0; + + failed_1: + release_sock(sock); + failed_0: + kranal_conn_decref(conn); + return rc; +} + +int +kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer) +{ + kranal_peer_t *peer2; + ptl_nid_t peer_nid; + unsigned long flags; + unsigned long timeout; + kra_conn_t *conn; + int rc; + int nstale; + + if (sock != NULL) { + /* passive: listener accepted sock */ + LASSERT (peer == NULL); + + rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn); + if (rc != 0) + return rc; + + /* assume this is a new peer */ + peer = kranal_create_peer(peer_nid); + if (peer == NULL) { + CERROR("Can't allocate peer for "LPX64"\n", peer_nid); + kranal_conn_decref(conn); + return -ENOMEM; + } + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + peer2 = kranal_find_peer_locked(peer_nid); + if (peer2 == NULL) { + /* peer table takes my initial ref on peer */ + list_add_tail(&peer->rap_list, + kranal_nid2peerlist(peer_nid)); + } else { + /* peer_nid already in the peer table */ + kranal_peer_decref(peer); + peer = peer2; + } + /* NB I may now have a non-persistent peer in the peer + * table with no connections: I can't drop the global lock + * until I've given it a connection or removed it, and when + * I do 'peer' can disappear under me. */ + } else { + /* active: connd wants to connect to peer */ + LASSERT (peer != NULL); + LASSERT (peer->rap_connecting); + + rc = kranal_active_conn_handshake(peer, &conn); + if (rc != 0) + return rc; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (!kranal_peer_active(peer)) { + /* raced with peer getting unlinked */ + write_unlock_irqrestore(&kranal_data.kra_global_lock, + flags); + kranal_conn_decref(conn); + return ESTALE; + } + } + + LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */ + peer_nid = peer->rap_nid; + + /* Refuse to duplicate an existing connection (both sides might try + * to connect at once). NB we return success! We _do_ have a + * connection (so we don't need to remove the peer from the peer + * table) and we _don't_ have any blocked txs to complete */ + if (kranal_conn_isdup_locked(peer, conn->rac_incarnation)) { + LASSERT (!list_empty(&peer->rap_conns)); + LASSERT (list_empty(&peer->rap_tx_queue)); + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + CWARN("Not creating duplicate connection to "LPX64"\n", + peer_nid); + kranal_conn_decref(conn); + return 0; + } + + kranal_peer_addref(peer); /* +1 ref for conn */ + conn->rac_peer = peer; + list_add_tail(&conn->rac_list, &peer->rap_conns); + + kranal_conn_addref(conn); /* +1 ref for conn table */ + list_add_tail(&conn->rac_hashlist, + kranal_cqid2connlist(conn->rac_cqid)); + + /* Schedule all packets blocking for a connection */ + while (!list_empty(&peer->rap_tx_queue)) { + tx = list_entry(&peer->rap_tx_queue.next, + kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_queue_tx_locked(tx, conn); + } + + nstale = kranal_close_stale_conns_locked(peer, conn->rac_incarnation); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + /* CAVEAT EMPTOR: passive peer can disappear NOW */ + + if (nstale != 0) + CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid); + + /* Ensure conn gets checked. Transmits may have been queued and an + * FMA event may have happened before it got in the cq hash table */ + kranal_schedule_conn(conn); + return 0; +} + +void +kranal_connect (kra_peer_t *peer) +{ + kra_tx_t *tx; + unsigned long flags; + struct list_head zombies; + int rc; + + LASSERT (peer->rap_connecting); + + rc = kranal_conn_handshake(NULL, peer); + + write_lock_irqqsave(&kranal_data.kra_global_lock, flags); + + LASSERT (peer->rap_connecting); + peer->rap_connecting = 0; + + if (rc == 0) { + /* kranal_conn_handshake() queues blocked txs immediately on + * success to avoid messages jumping the queue */ + LASSERT (list_empty(&peer->rap_tx_queue)); + + /* reset reconnection timeouts */ + peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; + peer->rap_reconnect_time = CURRENT_TIME; + + write_unlock_irqrestore(&kranal-data.kra_global_lock, flags); + return; + } + + LASSERT (peer->rap_reconnect_interval != 0); + peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval; + peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL, + 1 * peer->rap_reconnect_interval); + + /* Grab all blocked packets while we have the global lock */ + list_add(&zombies, &peer->rap_tx_queue); + list_del_init(&peer->rap_tx_queue); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + if (list_empty(&zombies)) + return; + + CWARN("Dropping packets for "LPX64": connection failed\n", + peer->rap_nid); + + do { + tx = list_entry(zombies.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -EHOSTUNREACH); + + } while (!list_empty(&zombies)); +} + +int +kranal_listener(void *arg) +{ + struct sockaddr_in addr; + wait_queue_t wait; + struct socket *sock; + struct socket *newsock; + int port; + int backlog; + int timeout; + kra_connreq_t *connreqs; + char name[16]; + + /* Parent thread holds kra_nid_mutex, and is, or is about to + * block on kra_listener_signal */ + + port = kra_tunables.kra_port; + snprintf(name, "kranal_lstn%03d", port); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + rc = -ENOMEM; + PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs)); + if (connreqs == NULL) + goto out_0; + + rc = kranal_create_sock(&sock, port); + if (rc != 0) + goto out_1; + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY + + rc = sock->ops->bind(sock, &addr, sizeof(addr)); + if (rc != 0) { + CERROR("Can't bind to port %d\n", port); + goto out_2; + } + + rc = sock->ops->listen(sock, kra_tunalbes.kra_backlog); + if (rc != 0) { + CERROR("Can't set listen backlog %d: %d\n", backlog, rc); + goto out_2; + } + + LASSERT (kranal_data.kra_listener_sock == NULL); + kranal_data.kra_listener_sock = sock; + + /* unblock waiting parent */ + LASSERT (kranal_data.kra_listener_shutdown == 0); + up(&kranal_data.kra_listener_signal); + + /* Wake me any time something happens on my socket */ + add_wait_queue(sock->sk->sk_sleep, &wait); + + while (kranal_data.kra_listener_shutdown == 0) { + + newsock = sock_alloc(); + if (newsock == NULL) { + CERROR("Can't allocate new socket for accept\n"); + kranal_pause(HZ); + continue; + } + + set_current_state(TASK_INTERRUPTIBLE); + + rc = sock->ops->accept(sock, newsock, O_NONBLOCK); + + if (rc == -EAGAIN && + kranal_data.kra_listener_shutdown == 0) + schedule(); + + set_current_state(TASK_RUNNING); + + if (rc != 0) { + sock_release(newsock); + if (rc != -EAGAIN) { + CERROR("Accept failed: %d\n", rc); + kranal_pause(HZ); + } + continue; + } + + kranal_conn_handshake(newsock, NULL); + sock_release(newsock); + } + + rc = 0; + remove_wait_queue(sock->sk->sk_sleep, &wait); + out_2: + sock_release(sock); + kranal_data.kra_listener_sock = NULL; + out_1: + PORTAL_FREE(connreqs, 2 * sizeof(*connreqs)); + out_0: + /* set completion status and unblock thread waiting for me + * (parent on startup failure, executioner on normal shutdown) */ + kranal_data.kra_listener_shutdown = rc; + up(&kranal_data.kra_listener_signal); + + return 0; +} + +int +kranal_start_listener () +{ + long pid; + int rc; + + CDEBUG(D_WARNING, "Starting listener\n"); + + /* Called holding kra_nid_mutex: listener stopped */ + LASSERT (kranal_data.kra_listener_sock == NULL); + + kranal_data.kra_listener_shutdown == 0; + pid = kernel_thread(kranal_listener, sock, 0); + if (pid < 0) { + CERROR("Can't spawn listener: %ld\n", pid); + return (int)pid; + } + + /* Block until listener has started up. */ + down(&kranal_data.kra_listener_signal); + + rc = kranal_data.kra_listener_shutdown; + LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL)); + + CDEBUG(D_WARNING, "Listener %ld started OK\n", pid); + return rc; +} + +void +kranal_stop_listener() +{ + CDEBUG(D_WARNING, "Stopping listener\n"); + + /* Called holding kra_nid_mutex: listener running */ + LASSERT (kranal_data.kra_listener_sock != NULL); + + kranal_data.kra_listener_shutdown = 1; + wake_up_all(kranal_data->kra_listener_sock->sk->sk_sleep); + + /* Block until listener has torn down. */ + down(&kranal_data.kra_listener_signal); + + LASSERT (kranal_data.kra_listener_sock == NULL); + CDEBUG(D_WARNING, "Listener stopped\n"); +} + +int +kranal_listener_procint(ctl_table *table, int write, struct file *filp, + void *buffer, size_t *lenp) +{ + int *tunable = (int *)table->data; + int old_val; + int rc; + + down(&kranal_data.kra_nid_mutex); + + LASSERT (tunable == &kranal_data.kra_port || + tunable == &kranal_data.kra_backlog); + old_val = *tunable; + + rc = proc_dointvec(table, write, filp, buffer, lenp); + + if (write && + (*tunable != old_val || + kranal_data.kra_listener_sock == NULL)) { + + if (kranal_data.kra_listener_sock != NULL) + kranal_stop_listener(); + + rc = kranal_start_listener(); + + if (rc != 0) { + *tunable = old_val; + kranal_start_listener(); + } + } + + up(&kranal_data.kra_nid_mutex); + return rc; +} + +int +kranal_set_mynid(ptl_nid_t nid) +{ + lib_ni_t *ni = &kranal_lib.libnal_ni; + int rc; + + CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n", + nid, ni->ni_pid.nid); + + down(&kranal_data.kra_nid_mutex); + + if (nid == ni->ni_pid.nid) { + /* no change of NID */ + up(&kranal_data.kra_nid_mutex); + return 0; + } + + if (kranal_data.kra_listener_sock != NULL) + kranal_stop_listener(); + + ni->ni_pid.nid = nid; + + /* Delete all existing peers and their connections after new + * NID/incarnation set to ensure no old connections in our brave + * new world. */ + kranal_del_peer(PTL_NID_ANY, 0); + + if (nid != PTL_NID_ANY) + rc = kranal_start_listener(); + + up(&kranal_data.kra_nid_mutex); + return rc; +} + +kra_peer_t * +kranal_create_peer (ptl_nid_t nid) +{ + kra_peer_t *peer; + + LASSERT (nid != PTL_NID_ANY); + + PORTAL_ALLOC(peer, sizeof(*peer)); + if (peer == NULL) + return NULL; + + memset(peer, 0, sizeof(*peer)); /* zero flags etc */ + + peer->rap_nid = nid; + atomic_set(&peer->rap_refcount, 1); /* 1 ref for caller */ + + INIT_LIST_HEAD(&peer->rap_list); /* not in the peer table yet */ + INIT_LIST_HEAD(&peer->rap_conns); + INIT_LIST_HEAD(&peer->rap_tx_queue); + + peer->rap_reconnect_time = CURRENT_TIME; + peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; + + atomic_inc(&kranal_data.kra_npeers); + return peer; +} + +void +__kranal_peer_decref (kra_peer_t *peer) +{ + CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer); + + LASSERT (atomic_read(&peer->rap_refcount) == 0); + LASSERT (peer->rap_persistence == 0); + LASSERT (!kranal_peer_active(peer)); + LASSERT (peer->rap_connecting == 0); + LASSERT (list_empty(&peer->rap_conns)); + LASSERT (list_empty(&peer->rap_tx_queue)); + + PORTAL_FREE(peer, sizeof(*peer)); + + /* NB a peer's connections keep a reference on their peer until + * they are destroyed, so we can be assured that _all_ state to do + * with this peer has been cleaned up when its refcount drops to + * zero. */ + atomic_dec(&kranal_data.kra_npeers); +} + +kra_peer_t * +kranal_find_peer_locked (ptl_nid_t nid) +{ + struct list_head *peer_list = kranal_nid2peerlist(nid); + struct list_head *tmp; + kra_peer_t *peer; + + list_for_each (tmp, peer_list) { + + peer = list_entry(tmp, kra_peer_t, rap_list); + + LASSERT (peer->rap_persistence > 0 || /* persistent peer */ + !list_empty(&peer->rap_conns)); /* active conn */ + + if (peer->rap_nid != nid) + continue; + + CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n", + peer, nid, atomic_read(&peer->rap_refcount)); + return peer; + } + return NULL; +} + +kra_peer_t * +kranal_find_peer (ptl_nid_t nid) +{ + kra_peer_t *peer; + + read_lock(&kranal_data.kra_global_lock); + peer = kranal_find_peer_locked(nid); + if (peer != NULL) /* +1 ref for caller? */ + kranal_peer_addref(peer); + read_unlock(&kranal_data.kra_global_lock); + + return peer; +} + +void +kranal_unlink_peer_locked (kra_peer_t *peer) +{ + LASSERT (peer->rap_persistence == 0); + LASSERT (list_empty(&peer->rap_conns)); + + LASSERT (kranal_peer_active(peer)); + list_del_init(&peer->rap_list); + + /* lose peerlist's ref */ + kranal_peer_decref(peer); +} + +int +kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep) +{ + kra_peer_t *peer; + struct list_head *ptmp; + int i; + + read_lock(&kranal_data.kra_global_lock); + + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) { + + list_for_each(ptmp, &kranal_data.kra_peers[i]) { + + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + if (index-- > 0) + continue; + + *nidp = peer->rap_nid; + *portp = peer->rap_port; + *persistencep = peer->rap_persistence; + + read_unlock(&kranal_data.kra_global_lock); + return 0; + } + } + + read_unlock(&kranal_data.kra_global_lock); + return -ENOENT; +} + +int +kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port) +{ + unsigned long flags; + kra_peer_t *peer; + kra_peer_t *peer2; + + if (nid == PTL_NID_ANY) + return -EINVAL; + + peer = kranal_create_peer(nid); + if (peer == NULL) + return -ENOMEM; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + peer2 = kranal_find_peer_locked(nid); + if (peer2 != NULL) { + kranal_put_peer(peer); + peer = peer2; + } else { + /* peer table takes existing ref on peer */ + list_add_tail(&peer->rap_list, + kranal_nid2peerlist(nid)); + } + + peer->rap_ip = ip; + peer->rap_port = port; + peer->rap_persistence++; + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + return 0; +} + +void +kranal_del_peer_locked (kra_peer_t *peer, int single_share) +{ + struct list_head *ctmp; + struct list_head *cnxt; + kra_conn_t *conn; + + if (!single_share) + peer->rap_persistence = 0; + else if (peer->rap_persistence > 0) + peer->rap_persistence--; + + if (peer->rap_persistence != 0) + return; + + if (list_empty(&peer->rap_conns)) { + kranal_unlink_peer_locked(peer); + } else { + list_for_each_safe(ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + kranal_close_conn_locked(conn, 0); + } + /* peer unlinks itself when last conn is closed */ + } +} + +int +kranal_del_peer (ptl_nid_t nid, int single_share) +{ + unsigned long flags; + struct list_head *ptmp; + struct list_head *pnxt; + kra_peer_t *peer; + int lo; + int hi; + int i; + int rc = -ENOENT; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (nid != PTL_NID_ANY) + lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers; + else { + lo = 0; + hi = kranal_data.kra_peer_hash_size - 1; + } + + for (i = lo; i <= hi; i++) { + list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) { + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + if (!(nid == PTL_NID_ANY || peer->rap_nid == nid)) + continue; + + kranal_del_peer_locked(peer, single_share); + rc = 0; /* matched something */ + + if (single_share) + goto out; + } + } + out: + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + return rc; +} + +kra_conn_t * +kranal_get_conn_by_idx (int index) +{ + kra_peer_t *peer; + struct list_head *ptmp; + kra_conn_t *conn; + struct list_head *ctmp; + int i; + + read_lock (&kranal_data.kra_global_lock); + + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) { + list_for_each (ptmp, &kranal_data.kra_peers[i]) { + + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + list_for_each (ctmp, &peer->rap_conns) { + if (index-- > 0) + continue; + + conn = list_entry(ctmp, kra_conn_t, rac_list); + CDEBUG(D_NET, "++conn[%p] -> "LPX64" (%d)\n", + conn, conn->rac_peer->rap_nid, + atomic_read(&conn->rac_refcount)); + atomic_inc(&conn->rac_refcount); + read_unlock(&kranal_data.kra_global_lock); + return conn; + } + } + } + + read_unlock(&kranal_data.kra_global_lock); + return NULL; +} + +int +kranal_close_peer_conns_locked (kra_peer_t *peer, int why) +{ + kra_conn_t *conn; + struct list_head *ctmp; + struct list_head *cnxt; + int count = 0; + + list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + count++; + kranal_close_conn_locked(conn, why); + } + + return count; +} + +int +kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation) +{ + kra_conn_t *conn; + struct list_head *ctmp; + struct list_head *cnxt; + int count = 0; + + list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + if (conn->rac_incarnation == incarnation) + continue; + + CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n", + peer->rap_nid, conn->rac_incarnation, incarnation); + LASSERT (conn->rac_incarnation < incarnation); + + count++; + kranal_close_conn_locked(conn, -ESTALE); + } + + return count; +} + +int +kranal_close_matching_conns (ptl_nid_t nid) +{ + unsigned long flags; + kra_peer_t *peer; + struct list_head *ptmp; + struct list_head *pnxt; + int lo; + int hi; + int i; + int count = 0; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (nid != PTL_NID_ANY) + lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers; + else { + lo = 0; + hi = kranal_data.kra_peer_hash_size - 1; + } + + for (i = lo; i <= hi; i++) { + list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) { + + peer = list_entry(ptmp, kra_peer_t, rap_list); + LASSERT (peer->rap_persistence > 0 || + !list_empty(&peer->rap_conns)); + + if (!(nid == PTL_NID_ANY || nid == peer->rap_nid)) + continue; + + count += kranal_close_peer_conns_locked(peer, 0); + } + } + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + /* wildcards always succeed */ + if (nid == PTL_NID_ANY) + return 0; + + return (count == 0) ? -ENOENT : 0; +} + +int +kranal_cmd(struct portals_cfg *pcfg, void * private) +{ + int rc = -EINVAL; + + LASSERT (pcfg != NULL); + + switch(pcfg->pcfg_command) { + case NAL_CMD_GET_PEER: { + ptl_nid_t nid = 0; + __u32 ip = 0; + int port = 0; + int share_count = 0; + + rc = kranal_get_peer_info(pcfg->pcfg_count, + &nid, &ip, &port, &share_count); + pcfg->pcfg_nid = nid; + pcfg->pcfg_size = 0; + pcfg->pcfg_id = ip; + pcfg->pcfg_misc = port; + pcfg->pcfg_count = 0; + pcfg->pcfg_wait = share_count; + break; + } + case NAL_CMD_ADD_PEER: { + rc = kranal_add_persistent_peer(pcfg->pcfg_nid, + pcfg->pcfg_id, /* IP */ + pcfg->pcfg_misc); /* port */ + break; + } + case NAL_CMD_DEL_PEER: { + rc = kranal_del_peer(pcfg->pcfg_nid, + /* flags == single_share */ + pcfg->pcfg_flags != 0); + break; + } + case NAL_CMD_GET_CONN: { + kra_conn_t *conn = kranal_get_conn_by_idx(pcfg->pcfg_count); + + if (conn == NULL) + rc = -ENOENT; + else { + rc = 0; + pcfg->pcfg_nid = conn->rac_peer->rap_nid; + pcfg->pcfg_id = 0; + pcfg->pcfg_misc = 0; + pcfg->pcfg_flags = 0; + kranal_put_conn(conn); + } + break; + } + case NAL_CMD_CLOSE_CONNECTION: { + rc = kranal_close_matching_conns(pcfg->pcfg_nid); + break; + } + case NAL_CMD_REGISTER_MYNID: { + if (pcfg->pcfg_nid == PTL_NID_ANY) + rc = -EINVAL; + else + rc = kranal_set_mynid(pcfg->pcfg_nid); + break; + } + } + + return rc; +} + +void +kranal_free_txdescs(struct list_head *freelist) +{ + kra_tx_t *tx; + + while (!list_empty(freelist)) { + tx = list_entry(freelist->next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + PORTAL_FREE(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys)); + PORTAL_FREE(tx, sizeof(*tx)); + } +} + +int +kranal_alloc_txdescs(struct list_head *freelist, int n) +{ + int isnblk = (freelist == &kranal_data.kra_idle_nblk_txs); + int i; + kra_tx_t *tx; + + LASSERT (freelist == &kranal_data.kra_idle_txs || + freelist == &kranal_data.kra_idle_nblk_txs); + LASSERT (list_empty(freelist)); + + for (i = 0; i < n; i++) { + + PORTAL_ALLOC(tx, sizeof(*tx)); + if (tx == NULL) { + CERROR("Can't allocate %stx[%d]\n", + isnblk ? "nblk ", i); + kranal_free_txdescs(); + return -ENOMEM; + } + + PORTAL_ALLOC(tx->tx_phys, + PLT_MD_MAX_IOV * sizeof(*tx->tx_phys)); + if (tx->tx_phys == NULL) { + CERROR("Can't allocate %stx[%d]->tx_phys\n", + isnblk ? "nblk ", i); + + PORTAL_FREE(tx, sizeof(*tx)); + kranal_free_txdescs(freelist); + return -ENOMEM; + } + + tx->tx_isnblk = isnblk + tx->tx_buftype = RANAL_BUF_NONE; + + list_add(&tx->tx_list, freelist); + } + + return 0; +} + +int +kranal_device_init(int id, kra_device_t *dev) +{ + const int total_ntx = RANAL_NTX + RANAL_NTX_NBLK; + RAP_RETURN rrc; + + dev->rad_id = id; + rrc = RapkGetDeviceByIndex(id, NULL, kranal_device_callback, + &dev->rad_handle); + if (rrc != RAP_SUCCESS) { + CERROR("Can't get Rapidarray Device %d: %d\n", idx, rrc); + goto failed_0; + } + + rrc = RapkReserveRdma(dev->rad_handle, total_ntx); + if (rrc != RAP_SUCCESS) { + CERROR("Can't reserve %d RDMA descriptors" + " for device[%d]: %d\n", total_ntx, i, rrc); + goto failed_1; + } + + rrc = RapkCreatePtag(dev->rad_handle, + &dev->rad_ptag); + if (rrc != RAP_SUCCESS) { + CERROR("Can't create ptag" + " for device[%d]: %d\n", i, rrc); + goto failed_1; + } + + rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag, + &dev->rad_rdma_cq); + if (rrc != RAP_SUCCESS) { + CERROR("Can't create rdma cq size %d" + " for device[%d]: %d\n", total_ntx, i, rrc); + goto failed_2; + } + + rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, + dev->rad_ptag, &dev->rad_fma_cq); + if (rrc != RAP_SUCCESS) { + CERROR("Can't create fma cq size %d" + " for device[%d]: %d\n", RANAL_RX_CQ_SIZE, i, rrc); + goto failed_3; + } + + return 0; + + failed_3: + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); + failed_2: + RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + failed_1: + RapkReleaseDevice(dev->rad_handle); + failed_0: + return -ENODEV; +} + +void +kranal_device_fini(kra_device_t *dev) +{ + RapkDestroyCQ(dev->rad_handle, dev->rad_rx_cq, dev->rad_ptag); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); + RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + RapkReleaseDevice(dev->rad_handle); +} + +void +kranal_api_shutdown (nal_t *nal) +{ + int i; + int rc; + unsigned long flags; + + if (nal->nal_refct != 0) { + /* This module got the first ref */ + PORTAL_MODULE_UNUSE; + return; + } + + CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n", + atomic_read(&portal_kmemory)); + + LASSERT (nal == &kranal_api); + + switch (kranal_data.kra_init) { + default: + CERROR("Unexpected state %d\n", kranal_data.kra_init); + LBUG(); + + case RANAL_INIT_ALL: + /* stop calls to nal_cmd */ + libcfs_nal_cmd_unregister(OPENRANAL); + /* No new persistent peers */ + + /* resetting my NID to unadvertises me, removes my + * listener and nukes all current peers */ + kranal_set_mynid(PTL_NID_ANY); + /* no new peers or conns */ + + /* Wait for all peer/conn state to clean up */ + i = 2; + while (atomic_read(&kranal_data.kra_nconns) != 0 || + atomic_read(&kranal-data.kra_npeers) != 0) { + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ + "waiting for %d peers and %d conns to close down\n", + atomic_read(&kranal_data.kra_npeers), + atomic_read(&kranal_data.kra_nconns)); + kranal_pause(HZ); + } + /* fall through */ + + case RANAL_INIT_LIB: + lib_fini(&kranal_lib); + /* fall through */ + + case RANAL_INIT_DATA: + break; + } + + /* flag threads to terminate; wake and wait for them to die */ + kranal_data.kra_shutdown = 1; + + for (i = 0; i < kranal_data.kra_ndevs; i++) { + kra_device_t *dev = &kranal_data.kra_devices[i]; + + LASSERT (list_empty(&dev->rad_connq)); + + spin_lock_irqsave(&dev->rad_lock, flags); + wake_up(&dev->rad_waitq); + spin_unlock_irqrestore(&dev->rad_lock, flags); + } + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + wake_up_all(&kranal_data.kra_reaper_waitq); + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); + + LASSERT (list_empty(&kranal_data.kra_connd_peers)); + spin_lock_irqsave(&kranal-data.kra_connd_lock, flags); + wake_up_all(&kranal_data.kra_connd_waitq); + spin_unlock_irqrestore(&kranal-data.kra_connd_lock, flags); + + i = 2; + while (atomic_read(&kranal_data.kra_nthreads) != 0) { + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ + "Waiting for %d threads to terminate\n", + atomic_read(&kranal_data.kra_nthreads)); + kranal_pause(HZ); + } + + LASSERT (atomic_read(&kranal_data.kra_npeers) == 0); + if (kranal_data.kra_peers != NULL) { + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) + LASSERT (list_empty(&kranal_data.kra_peers[i])); + + PORTAL_FREE(kranal_data.kra_peers, + sizeof (struct list_head) * + kranal_data.kra_peer_hash_size); + } + + LASSERT (atomic_read(&kranal_data.kra_nconns) == 0); + if (kranal_data.kra_conns != NULL) { + for (i = 0; i < kranal_data.kra_conn_hash_size; i++) + LASSERT (list_empty(&kranal_data.kra_conns[i])); + + PORTAL_FREE(kranal_data.kra_conns, + sizeof (struct list_head) * + kranal_data.kra_conn_hash_size); + } + + for (i = 0; i < kranal_data.kra_ndevs; i++) + kranal_device_fini(&kranal_data.kra_devices[i]); + + kranal_free_txdescs(&kranal_data.kra_idle_txs); + kranal_free_txdescs(&kranal_data.kra_idle_nblk_txs); + + CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n", + atomic_read(&portal_kmemory)); + printk(KERN_INFO "Lustre: RapidArray NAL unloaded (final mem %d)\n", + atomic_read(&portal_kmemory)); + + kranal_data.kra_init = RANAL_INIT_NOTHING; +} + +int +kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, + ptl_ni_limits_t *requested_limits, + ptl_ni_limits_t *actual_limits) +{ + static int device_ids[] = {RAPK_MAIN_DEVICE_ID, + RAPK_EXPANSION_DEVICE_ID}; + struct timeval tv; + ptl_process_id_t process_id; + int pkmem = atomic_read(&portal_kmemory); + int rc; + int i; + kra_device_t *dev; + + LASSERT (nal == &kranal_api); + + if (nal->nal_refct != 0) { + if (actual_limits != NULL) + *actual_limits = kranal_lib.libnal_ni.ni_actual_limits; + /* This module got the first ref */ + PORTAL_MODULE_USE; + return PTL_OK; + } + + LASSERT (kranal_data.kra_init == RANAL_INIT_NOTHING); + + memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */ + + /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and + * a unique (for all time) incarnation so we can uniquely identify + * the sender. The incarnation is an incrementing counter + * initialised with seconds + microseconds at startup time. So we + * rely on NOT creating connections more frequently on average than + * 1MHz to ensure we don't use old incarnations when we reboot. */ + do_gettimeofday(&tv); + kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; + + init_MUTEX(&kranal_data.kra_nid_mutex); + init_MUTEX_LOCKED(&kranal_data.kra_listener_signal); + + rwlock_init(&kranal_data.kra_global_lock); + + for (i = 0; i < RANAL_MAXDEVS; i++ ) { + kra_device_t *dev = &kranal_data.kra_devices[i]; + + dev->rad_idx = i; + INIT_LIST_HEAD(&dev->rad_connq); + init_waitqueue_head(&dev->rad_waitq); + spin_lock_init(&dev->rad_lock); + } + + init_waitqueue_head(&kranal_data.kra_reaper_waitq); + spin_lock_init(&kranal_data.kra_reaper_lock); + + INIT_LIST_HEAD(&kranal_data.kra_connd_peers); + init_waitqueue_head(&kranal_data.kra_connd_waitq); + spin_lock_init(&kranal_data.kra_connd_lock); + + INIT_LIST_HEAD(&kranal_data.kra_idle_txs); + INIT_LIST_HEAD(&kranal_data.kra_idle_nblk_txs); + init_waitqueue_head(&kranal_data.kra_idle_tx_waitq); + spin_lock_init(&kranal_data.kra_tx_lock); + + /* OK to call kranal_api_shutdown() to cleanup now */ + kranal_data.kra_init = RANAL_INIT_DATA; + + kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE; + PORTAL_ALLOC(kranal_data.kra_peers, + sizeof(struct list_head) * kranal_data.kra_peer_hash_size); + if (kranal_data.kra_peers == NULL) + goto failed; + + for (i = 0; i < kranal_data.kra_peer_hash_size; i++) + INIT_LIST_HEAD(&kranal_data.kra_peers[i]); + + kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE; + PORTAL_ALLOC(kranal_data.kra_conns, + sizeof(struct list_head) * kranal_data.kra_conn_hash_size); + if (kranal_data.kra_conns == NULL) + goto failed; + + for (i = 0; i < kranal_data.kra_conn_hash_size; i++) + INIT_LIST_HEAD(&kranal_data.kra_conns[i]); + + rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, RANAL_NTX); + if (rc != 0) + goto failed; + + rc = kranal_alloc_txdescs(&kranal_data.kra_idle_nblk_txs,RANAL_NTX_NBLK); + if (rc != 0) + goto failed; + + process_id.pid = requested_pid; + process_id.nid = PTL_NID_ANY; /* don't know my NID yet */ + + rc = lib_init(&kranal_lib, nal, process_id, + requested_limits, actual_limits); + if (rc != PTL_OK) { + CERROR("lib_init failed: error %d\n", rc); + goto failed; + } + + /* lib interface initialised */ + kranal_data.kra_init = RANAL_INIT_LIB; + /*****************************************************/ + + rc = kranal_thread_start(kranal_reaper, NULL); + if (rc != 0) { + CERROR("Can't spawn ranal reaper: %d\n", rc); + goto failed; + } + + for (i = 0; i < RANAL_N_CONND; i++) { + rc = kranal_thread_start(kranal_connd, (void *)i); + if (rc != 0) { + CERROR("Can't spawn ranal connd[%d]: %d\n", + i, rc); + goto failed; + } + } + + LASSERT(kranal_data.kra_ndevs == 0); + for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) { + dev = &kranal_data.kra_devices[kranal_data.kra_ndevs]; + + rc = kranal_device_init(device_ids[i], dev); + if (rc == 0) + kranal_data.kra_ndevs++; + + rc = kranal_thread_start(kranal_scheduler, dev); + if (rc != 0) { + CERROR("Can't spawn ranal scheduler[%d]: %d\n", + i, rc); + goto failed; + } + } + + if (kranal_data.kra_ndevs == 0) + goto failed; + + rc = libcfs_nal_cmd_register(OPENRANAL, &kranal_cmd, NULL); + if (rc != 0) { + CERROR("Can't initialise command interface (rc = %d)\n", rc); + goto failed; + } + + /* flag everything initialised */ + kranal_data.kra_init = RANAL_INIT_ALL; + /*****************************************************/ + + CDEBUG(D_MALLOC, "initial kmem %d\n", atomic_read(&portal_kmemory)); + printk(KERN_INFO "Lustre: RapidArray NAL loaded " + "(initial mem %d)\n", pkmem); + + return PTL_OK; + + failed: + kranal_api_shutdown(&kranal_api); + return PTL_FAIL; +} + +void __exit +kranal_module_fini (void) +{ +#ifdef CONFIG_SYSCTL + if (kranal_tunables.kra_sysctl != NULL) + unregister_sysctl_table(kranal_tunables.kra_sysctl); +#endif + PtlNIFini(kranal_ni); + + ptl_unregister_nal(OPENRANAL); +} + +int __init +kranal_module_init (void) +{ + int rc; + + /* the following must be sizeof(int) for + * proc_dointvec/kranal_listener_procint() */ + LASSERT (sizeof(kranal_tunables.kra_timeout) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_listener_timeout) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_backlog) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_port) == sizeof(int)); + LASSERT (sizeof(kranal_tunables.kra_max_immediate) == sizeof(int)); + + kranal_api.nal_ni_init = kranal_api_startup; + kranal_api.nal_ni_fini = kranal_api_shutdown; + + /* Initialise dynamic tunables to defaults once only */ + kranal_tunables.kra_timeout = RANAL_TIMEOUT; + + rc = ptl_register_nal(OPENRANAL, &kranal_api); + if (rc != PTL_OK) { + CERROR("Can't register RANAL: %d\n", rc); + return -ENOMEM; /* or something... */ + } + + /* Pure gateways want the NAL started up at module load time... */ + rc = PtlNIInit(OPENRANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni); + if (rc != PTL_OK && rc != PTL_IFACE_DUP) { + ptl_unregister_nal(OPENRANAL); + return -ENODEV; + } + +#ifdef CONFIG_SYSCTL + /* Press on regardless even if registering sysctl doesn't work */ + kranal_tunables.kra_sysctl = + register_sysctl_table(kranal_top_ctl_table, 0); +#endif + return 0; +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Kernel RapidArray NAL v0.01"); +MODULE_LICENSE("GPL"); + +module_init(kranal_module_init); +module_exit(kranal_module_fini); diff --git a/lustre/portals/knals/ranal/ranal.h b/lustre/portals/knals/ranal/ranal.h new file mode 100644 index 0000000..c134179 --- /dev/null +++ b/lustre/portals/knals/ranal/ranal.h @@ -0,0 +1,438 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2004 Cluster File Systems, Inc. + * Author: Eric Barton + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG_SUBSYSTEM S_NAL + +#include +#include +#include +#include + +#include + +#if CONFIG_SMP +# define RANAL_N_SCHED num_online_cpus() /* # schedulers */ +#else +# define RANAL_N_SCHED 1 /* # schedulers */ +#endif + +#define RANAL_MAXDEVS 2 /* max # devices RapidArray supports */ + +#define RANAL_N_CONND 4 /* # connection daemons */ + +#define RANAL_MIN_RECONNECT_INTERVAL 1 /* first failed connection retry (seconds)... */ +#define RANAL_MAX_RECONNECT_INTERVAL 60 /* ...exponentially increasing to this */ + +#define RANAL_FMA_PREFIX_LEN 232 /* size of FMA "Prefix" */ +#define RANAL_FMA_MAX_DATA_LEN ((7<<10)-256) /* Max FMA MSG is 7K including prefix */ + +#define RANAL_PEER_HASH_SIZE 101 /* # peer lists */ +#define RANAL_CONN_HASH_SIZE 101 /* # conn lists */ + +#define RANAL_NTX 64 /* # tx descs */ +#define RANAL_NTX_NBLK 256 /* # reserved tx descs */ + +#define RANAL_RX_CQ_SIZE 1024 /* # entries in receive CQ + * (overflow is a performance hit) */ + +#define RANAL_RESCHED 100 /* # scheduler loops before reschedule */ + +#define RANAL_MIN_TIMEOUT 5 /* minimum timeout interval (seconds) */ +#define RANAL_TIMEOUT2KEEPALIVE(t) (((t)+1)/2) /* timeout -> keepalive interval */ + +/* default vals for runtime tunables */ +#define RANAL_TIMEOUT 30 /* comms timeout (seconds) */ +#define RANAL_LISTENER_TIMEOUT 5 /* listener timeout (seconds) */ +#define RANAL_MAX_IMMEDIATE (2<<10) /* biggest immediate payload */ + +typedef struct +{ + int kra_timeout; /* comms timeout (seconds) */ + int kra_listener_timeout; /* max time the listener can block */ + int kra_backlog; /* listener's backlog */ + int kra_port; /* listener's TCP/IP port */ + int kra_max_immediate; /* biggest immediate payload */ + struct ctl_table_header *kra_sysctl; /* sysctl interface */ +} kra_tunables_t; + +typedef struct +{ + RAP_PVOID rad_handle; /* device handle */ + RAP_PROTECTION_HANDLE rad_ptag; /* protection tag */ + RAP_CQ_HANDLE rad_fma_cq; /* FMA (small message) completion queue */ + RAP_CQ_HANDLE rad_rdma_cq; /* rdma completion queue */ + int rad_id; /* device id */ + int rad_idx; /* index in kra_devices */ + int rad_ready; /* set by device callback */ + struct list_head rad_connq; /* connections requiring attention */ + wait_queue_head_t rad_waitq; /* scheduler waits here */ + spinlock_t rad_lock; /* serialise */ +} kra_device_t; + +typedef struct +{ + int kra_init; /* initialisation state */ + int kra_shutdown; /* shut down? */ + atomic_t kra_nthreads; /* # live threads */ + + struct semaphore kra_nid_mutex; /* serialise NID/listener ops */ + struct semaphore kra_listener_signal; /* block for listener startup/shutdown */ + struct socket *kra_listener_sock; /* listener's socket */ + int kra_listener_shutdown; /* ask listener to close */ + + kra_device_t kra_devices[RANAL_MAXDEVS]; /* device/ptag/cq etc */ + int kra_ndevs; /* # devices */ + + rwlock_t kra_global_lock; /* stabilize peer/conn ops */ + + struct list_head *kra_peers; /* hash table of all my known peers */ + int kra_peer_hash_size; /* size of kra_peers */ + atomic_t kra_npeers; /* # peers extant */ + + struct list_head *kra_conns; /* conns hashed by cqid */ + int kra_conn_hash_size; /* size of kra_conns */ + __u64 kra_next_incarnation; /* conn incarnation # generator */ + int kra_next_cqid; /* cqid generator */ + atomic_t kra_nconns; /* # connections extant */ + + long kra_new_min_timeout; /* minimum timeout on any new conn */ + wait_queue_head_t kra_reaper_waitq; /* reaper sleeps here */ + spinlock_t kra_reaper_lock; /* serialise */ + + struct list_head kra_connd_peers; /* peers waiting for a connection */ + wait_queue_head_t kra_connd_waitq; /* connection daemons sleep here */ + spinlock_t kra_connd_lock; /* serialise */ + + struct list_head kra_idle_txs; /* idle tx descriptors */ + struct list_head kra_idle_nblk_txs; /* idle reserved tx descriptors */ + __u64 kra_next_tx_cookie; /* RDMA completion cookie */ + wait_queue_head_t kra_idle_tx_waitq; /* block here for tx descriptor */ + spinlock_t kra_tx_lock; /* serialise */ +} kra_data_t; + +#define RANAL_INIT_NOTHING 0 +#define RANAL_INIT_DATA 1 + +#define RANAL_INIT_ALL 7 + +/************************************************************************ + * Wire message structs. These are sent in sender's byte order + * (i.e. receiver checks magic and flips if required). + */ + +typedef struct kra_connreq /* connection request/response */ +{ /* (sent via socket) */ + __u32 racr_magic; /* I'm an ranal connreq */ + __u16 racr_version; /* this is my version number */ + __u16 racr_devid; /* which device to connect on */ + __u64 racr_nid; /* my NID */ + __u64 racr_incarnation; /* my incarnation */ + __u32 racr_timeout; /* my timeout */ + RAP_RI_PARAMETERS racr_riparams; /* my endpoint info */ +} kra_connreq_t; + +typedef struct +{ + RAP_MEM_KEY rard_key; + RAP_PVOID64 rard_addr; + RAP_UINT32 rard_nob; +} kra_rdma_desc_t; + +typedef struct +{ + ptl_hdr_t raim_hdr; /* portals header */ + /* Portals payload is in FMA "Message Data" */ +} kra_immediate_msg_t; + +typedef struct +{ + ptl_hdr_t raprm_hdr; /* portals header */ + __u64 raprm_cookie; /* opaque completion cookie */ +} kra_putreq_msg_t; + +typedef struct +{ + __u64 rapam_src_cookie; /* reflected completion cookie */ + __u64 rapam_dst_cookie; /* opaque completion cookie */ + kra_rdma_desc_t rapam_desc; /* sender's sink buffer */ +} kra_putack_msg_t; + +typedef struct +{ + ptl_hdr_t ragm_hdr; /* portals header */ + __u64 ragm_cookie; /* opaque completion cookie */ + kra_rdma_desc_t ragm_desc; /* sender's sink buffer */ +} kra_get_msg_t; + +typedef struct +{ + __u64 racm_cookie; /* reflected completion cookie */ +} kra_completion_msg_t; + +typedef struct /* NB must fit in FMA "Prefix" */ +{ + __u32 ram_magic; /* I'm an ranal message */ + __u16 ram_version; /* this is my version number */ + __u16 ram_type; /* msg type */ + __u64 ram_srcnid; /* sender's NID */ + __u64 ram_incarnation; /* sender's connection incarnation */ + union { + kra_immediate_msg_t immediate; + kra_putreq_msg_t putreq; + kra_putack_msg_t putack; + kra_get_msg_t get; + kra_completion_msg_t completion; + } ram_u; + __u32 ram_seq; /* incrementing sequence number */ +} kra_msg_t; + +#define RANAL_MSG_MAGIC 0x0be91b92 /* unique magic */ +#define RANAL_MSG_VERSION 1 /* current protocol version */ + +#define RANAL_MSG_FENCE 0x80 /* fence RDMA */ + +#define RANAL_MSG_NONE 0x00 /* illegal message */ +#define RANAL_MSG_NOOP 0x01 /* empty ram_u (keepalive) */ +#define RANAL_MSG_IMMEDIATE 0x02 /* ram_u.immediate */ +#define RANAL_MSG_PUT_REQ 0x03 /* ram_u.putreq (src->sink) */ +#define RANAL_MSG_PUT_NAK 0x04 /* ram_u.completion (no PUT match: sink->src) */ +#define RANAL_MSG_PUT_ACK 0x05 /* ram_u.putack (PUT matched: sink->src) */ +#define RANAL_MSG_PUT_DONE 0x86 /* ram_u.completion (src->sink) */ +#define RANAL_MSG_GET_REQ 0x07 /* ram_u.get (sink->src) */ +#define RANAL_MSG_GET_NAK 0x08 /* ram_u.completion (no GET match: src->sink) */ +#define RANAL_MSG_GET_DONE 0x89 /* ram_u.completion (src->sink) */ +#define RANAL_MSG_CLOSE 0x8a /* empty ram_u */ + +/***********************************************************************/ + +typedef struct kra_tx /* message descriptor */ +{ + struct list_head tx_list; /* queue on idle_txs/rac_sendq/rac_waitq */ + struct kra_conn *tx_conn; /* owning conn */ + lib_msg_t *tx_libmsg[2]; /* lib msgs to finalize on completion */ + unsigned long tx_qtime; /* when tx started to wait for something */ + int tx_isnblk; /* I'm reserved for non-blocking sends */ + int tx_nob; /* # bytes of payload */ + int tx_buftype; /* payload buffer type */ + void *tx_buffer; /* source/sink buffer */ + int tx_phys_offset; /* first page offset (if phys) */ + int tx_phys_npages; /* # physical pages */ + RAP_PHYS_REGION *tx_phys; /* page descriptors */ + RAP_MEM_KEY tx_map_key; /* mapping key */ + RAP_RDMA_DESCRIPTOR tx_rdma_desc; /* rdma descriptor */ + __u64 tx_cookie; /* identify this tx to peer */ + kra_msg_t tx_msg; /* FMA message buffer */ +} kra_tx_t; + +#define RANAL_BUF_NONE 0 /* buffer type not set */ +#define RANAL_BUF_IMMEDIATE 1 /* immediate data */ +#define RANAL_BUF_PHYS_UNMAPPED 2 /* physical: not mapped yet */ +#define RANAL_BUF_PHYS_MAPPED 3 /* physical: mapped already */ +#define RANAL_BUF_VIRT_UNMAPPED 4 /* virtual: not mapped yet */ +#define RANAL_BUF_VIRT_MAPPED 5 /* virtual: mapped already */ + +#define RANAL_TX_IDLE 0x00 /* on freelist */ +#define RANAL_TX_SIMPLE 0x10 /* about to send a simple message */ +#define RANAL_TX_PUTI_REQ 0x20 /* PUT initiator about to send PUT_REQ */ +#define RANAL_TX_PUTI_WAIT_ACK 0x21 /* PUT initiator waiting for PUT_ACK */ +#define RANAL_TX_PUTI_RDMA 0x22 /* PUT initiator waiting for RDMA to complete */ +#define RANAL_TX_PUTI_DONE 0x23 /* PUT initiator about to send PUT_DONE */ +#define RANAL_TX_PUTT_NAK 0x30 /* PUT target about to send PUT_NAK */ +#define RANAL_TX_PUTT_ACK 0x30 /* PUT target about to send PUT_ACK */ +#define RANAL_TX_PUTT_WAIT_DONE 0x31 /* PUT target waiting for PUT_DONE */ +#define RANAL_TX_GETI_REQ 0x40 /* GET initiator about to send GET_REQ */ +#define RANAL_TX_GETI_WAIT_DONE 0x41 /* GET initiator waiting for GET_DONE */ +#define RANAL_TX_GETT_NAK 0x50 /* GET target about to send PUT_NAK */ +#define RANAL_TX_GETT_RDMA 0x51 /* GET target waiting for RDMA to complete */ +#define RANAL_TX_GETT_DONE 0x52 /* GET target about to send GET_DONE */ + +typedef struct kra_conn +{ + struct kra_peer *rac_peer; /* owning peer */ + struct list_head rac_list; /* stash on peer's conn list */ + struct list_head rac_hashlist; /* stash in connection hash table */ + struct list_head rac_schedlist; /* queue for scheduler */ + struct list_head rac_fmaq; /* txs queued for FMA */ + struct list_head rac_rdmaq; /* txs awaiting RDMA completion */ + struct list_head rac_replyq; /* txs awaiting replies */ + __u64 rac_peer_incarnation; /* peer's unique connection stamp */ + __u64 rac_my_incarnation; /* my unique connection stamp */ + unsigned long rac_last_tx; /* when I last sent an FMA message */ + unsigned long rac_last_rx; /* when I last received an FMA messages */ + long rac_keepalive; /* keepalive interval */ + long rac_timeout; /* infer peer death on (last_rx + timout > now) */ + __u32 rac_cqid; /* my completion callback id (non-unique) */ + __u32 rac_tx_seq; /* tx msg sequence number */ + __u32 rac_rx_seq; /* rx msg sequence number */ + atomic_t rac_refcount; /* # users */ + unsigned int rac_close_sent; /* I've sent CLOSE */ + unsigned int rac_close_recvd; /* I've received CLOSE */ + unsigned int rac_closing; /* connection being torn down */ + unsigned int rac_scheduled; /* being attented to */ + spinlock_t rac_lock; /* serialise */ + kra_device_t *rac_device; /* which device */ + RAP_PVOID rac_rihandle; /* RA endpoint */ + kra_msg_t *rac_rxmsg; /* incoming message (FMA prefix) */ + kra_msg_t rac_msg; /* keepalive/CLOSE message buffer */ +} kra_conn_t; + +typedef struct kra_peer +{ + struct list_head rap_list; /* stash on global peer list */ + struct list_head rap_connd_list; /* schedule on kra_connd_peers */ + struct list_head rap_conns; /* all active connections */ + struct list_head rap_tx_queue; /* msgs waiting for a conn */ + ptl_nid_t rap_nid; /* who's on the other end(s) */ + __u32 rap_ip; /* IP address of peer */ + int rap_port; /* port on which peer listens */ + atomic_t rap_refcount; /* # users */ + int rap_persistence; /* "known" peer refs */ + int rap_connecting; /* connection forming */ + unsigned long rap_reconnect_time; /* CURRENT_TIME when reconnect OK */ + unsigned long rap_reconnect_interval; /* exponential backoff */ +} kra_peer_t; + + +extern lib_nal_t kranal_lib; +extern kra_data_t kranal_data; +extern kra_tunables_t kranal_tunables; + +static inline void +kranal_peer_addref(kra_peer_t *peer) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid); + LASSERT(atomic_read(&peer->rap_refcount) > 0); + atomic_inc(&peer->rap_refcount); +} + +static inline void +kranal_peer_decref(kra_peer_t *peer) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid); + LASSERT(atomic_read(&peer->rap_refcount) > 0); + if (atomic_dec_and_test(&peer->rap_refcount)) + __kranal_peer_decref(peer); +} + +static inline struct list_head * +kranal_nid2peerlist (ptl_nid_t nid) +{ + unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size; + + return (&kranal_data.kra_peers [hash]); +} + +static inline int +kranal_peer_active(kra_peer_t *peer) +{ + /* Am I in the peer hash table? */ + return (!list_empty(&peer->rap_list)); +} + +static inline void +kranal_conn_addref(kra_conn_t *conn) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid); + LASSERT(atomic_read(&conn->rac_refcount) > 0); + atomic_inc(&conn->rac_refcount); +} + +static inline void +kranal_conn_decref(kra_conn_t *conn) +{ + CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid); + LASSERT(atomic_read(&conn->rac_refcount) > 0); + if (atomic_dec_and_test(&conn->rac_refcount)) + __kranal_conn_decref(conn); +} + +static inline struct list_head * +kranal_cqid2connlist (__u32 cqid) +{ + unsigned int hash = cqid % kranal_data.kra_conn_hash_size; + + return (&kranal_data.kra_conns [hash]); +} + +static inline kra_conn_t * +kranal_cqid2conn_locked (__u32 cqid) +{ + struct list_head conns = kranal_cqid2connlist(cqid); + struct list_head *tmp; + + list_for_each(tmp, conns) { + conn = list_entry(tmp, kra_conn_t, rac_hashlist); + + if (conn->rac_cqid == cqid) + return conn; + } + + return NULL; +} + +static inline int +kranal_tx_mapped (kra_tx_t *tx) +{ + return (tx->tx_buftype == RANAL_BUF_VIRT_MAPPED || + tx->tx_buftype == RANAL_BUF_PHYS_MAPPED); +} + +#if CONFIG_X86 +static inline __u64 +kranal_page2phys (struct page *p) +{ + __u64 page_number = p - mem_map; + + return (page_number << PAGE_SHIFT); +} +#else +# error "no page->phys" +#endif + diff --git a/lustre/portals/knals/ranal/ranal_cb.c b/lustre/portals/knals/ranal/ranal_cb.c new file mode 100644 index 0000000..b491d71 --- /dev/null +++ b/lustre/portals/knals/ranal/ranal_cb.c @@ -0,0 +1,1754 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2004 Cluster File Systems, Inc. + * Author: Eric Barton + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include "ranal.h" + +int +kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist) +{ + /* I would guess that if kranal_get_peer (nid) == NULL, + and we're not routing, then 'nid' is very distant :) */ + if ( nal->libnal_ni.ni_pid.nid == nid ) { + *dist = 0; + } else { + *dist = 1; + } + + return 0; +} + +void +kranal_device_callback(RAP_INT32 devid) +{ + kra_device_t *dev; + int i; + + for (i = 0; i < kranal_data.kra_ndevs; i++) { + + dev = &kranal_data.kra_devices[i]; + if (dev->rad_id != devid) + continue; + + spin_lock_irqsave(&dev->rad_lock, flags); + + if (!dev->rad_ready) { + dev->rad_ready = 1; + wake_up(&dev->rad_waitq); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); + return; + } + + CWARN("callback for unknown device %d\n", devid); +} + +void +kranal_schedule_conn(kra_conn_t *conn) +{ + kra_device_t *dev = conn->rac_device; + unsigned long flags; + + spin_lock_irqsave(&dev->rad_lock, flags); + + if (!conn->rac_scheduled) { + kranal_conn_addref(conn); /* +1 ref for scheduler */ + conn->rac_scheduled = 1; + list_add_tail(&conn->rac_schedlist, &dev->rad_connq); + wake_up(&dev->rad_waitq); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); +} + +void +kranal_schedule_cqid (__u32 cqid) +{ + kra_conn_t *conn; + struct list_head *conns; + struct list_head *tmp; + + conns = kranal_cqid2connlist(cqid); + + read_lock(&kranal_data.kra_global_lock); + + conn = kranal_cqid2conn_locked(cqid); + + if (conn == NULL) + CWARN("no cqid %x\n", cqid); + else + kranal_schedule_conn(conn); + + read_unlock(&kranal_data.kra_global_lock); +} + +void +kranal_schedule_dev(kra_device_t *dev) +{ + kra_conn_t *conn; + struct list_head *conns; + struct list_head *tmp; + int i; + + /* Don't do this in IRQ context (servers may have 1000s of clients) */ + LASSERT (!in_interrupt()); + + CWARN("Scheduling ALL conns on device %d\n", dev->rad_id); + + for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { + + /* Drop the lock on each hash bucket to ensure we don't + * block anyone for too long at IRQ priority on another CPU */ + + read_lock(&kranal_data.kra_global_lock); + + conns = &kranal_data.kra_conns[i]; + + list_for_each (tmp, conns) { + conn = list_entry(tmp, kra_conn_t, rac_hashlist); + + if (conn->rac_device == dev) + kranal_schedule_conn(conn); + } + read_unlock(&kranal_data.kra_global_lock); + } +} + +void +kranal_tx_done (kra_tx_t *tx, int completion) +{ + ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL; + kra_device_t *dev; + unsigned long flags; + int i; + RAP_RETURN rrc; + + LASSERT (!in_interrupt()); + + switch (tx->tx_buftype) { + default: + LBUG(); + + case RANAL_BUF_NONE: + case RANAL_BUF_IMMEDIATE: + case RANAL_BUF_PHYS_UNMAPPED: + case RANAL_BUF_VIRT_UNMAPPED: + break; + + case RANAL_BUF_PHYS_MAPPED: + LASSERT (tx->tx_conn != NULL); + dev = tx->tx_con->rac_device; + rrc = RapkDeregisterMemory(dev->rad_handle, NULL, + dev->rad_ptag, &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + break; + + case RANAL_BUF_VIRT_MAPPED: + LASSERT (tx->tx_conn != NULL); + dev = tx->tx_con->rac_device; + rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer + dev->rad_ptag, &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + break; + } + + for (i = 0; i < 2; i++) { + /* tx may have up to 2 libmsgs to finalise */ + if (tx->tx_libmsg[i] == NULL) + continue; + + lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc); + tx->tx_libmsg[i] = NULL; + } + + tx->tx_buftype = RANAL_BUF_NONE; + tx->tx_msg.ram_type = RANAL_MSG_NONE; + tx->tx_conn = NULL; + + spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); + + if (tx->tx_isnblk) { + list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs); + } else { + list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs); + wake_up(&kranal_data.kra_idle_tx_waitq); + } + + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); +} + +kra_tx_t * +kranal_get_idle_tx (int may_block) +{ + unsigned long flags; + kra_tx_t *tx = NULL; + + for (;;) { + spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); + + /* "normal" descriptor is free */ + if (!list_empty(&kranal_data.kra_idle_txs)) { + tx = list_entry(kranal_data.kra_idle_txs.next, + kra_tx_t, tx_list); + break; + } + + if (!may_block) { + /* may dip into reserve pool */ + if (list_empty(&kranal_data.kra_idle_nblk_txs)) { + CERROR("reserved tx desc pool exhausted\n"); + break; + } + + tx = list_entry(kranal_data.kra_idle_nblk_txs.next, + kra_tx_t, tx_list); + break; + } + + /* block for idle tx */ + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + + wait_event(kranal_data.kra_idle_tx_waitq, + !list_empty(&kranal_data.kra_idle_txs)); + } + + if (tx != NULL) { + list_del(&tx->tx_list); + + /* Allocate a new completion cookie. It might not be + * needed, but we've got a lock right now... */ + tx->tx_cookie = kranal_data.kra_next_tx_cookie++; + + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE); + LASSERT (tx->tx_conn == NULL); + LASSERT (tx->tx_libmsg[0] == NULL); + LASSERT (tx->tx_libmsg[1] == NULL); + } + + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + + return tx; +} + +void +kranal_init_msg(kra_msg_t *msg, int type) +{ + msg->ram_magic = RANAL_MSG_MAGIC; + msg->ram_version = RANAL_MSG_VERSION; + msg->ram_type = type; + msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid; + /* ram_incarnation gets set when FMA is sent */ +} + +kra_tx_t +kranal_new_tx_msg (int may_block, int type) +{ + kra_tx_t *tx = kranal_get_idle_tx(may_block); + + if (tx == NULL) + return NULL; + + kranal_init_msg(&tx->tx_msg, type); + return tx; +} + +int +kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, + int offset, int nob) + +{ + LASSERT (nob > 0); + LASSERT (niov > 0); + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + niov--; + iov++; + LASSERT (niov > 0); + } + + if (nob > iov->iov_len - offset) { + CERROR("Can't handle multiple vaddr fragments\n"); + return -EMSGSIZE; + } + + tx->tx_bufftype = RANAL_BUF_IMMEDIATE; + tx->tx_nob = nob; + tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); + return 0; +} + +int +kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, + int offset, int nob) + +{ + LASSERT (nob > 0); + LASSERT (niov > 0); + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + niov--; + iov++; + LASSERT (niov > 0); + } + + if (nob > iov->iov_len - offset) { + CERROR("Can't handle multiple vaddr fragments\n"); + return -EMSGSIZE; + } + + tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED; + tx->tx_nob = nob; + tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); + return 0; +} + +int +kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, + int offset, int nob) +{ + RAP_PHYS_REGION *phys = tx->tx_phys; + int resid; + + CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob); + + LASSERT (nob > 0); + LASSERT (nkiov > 0); + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + nkiov--; + kiov++; + LASSERT (nkiov > 0); + } + + tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED; + tx->tx_nob = nob; + tx->tx_buffer = NULL; + tx->tx_phys_offset = kiov->kiov_offset + offset; + + phys->Address = kranal_page2phys(kiov->kiov_page); + phys->Length = PAGE_SIZE; + phys++; + + resid = nob - (kiov->kiov_len - offset); + while (resid > 0) { + kiov++; + nkiov--; + LASSERT (nkiov > 0); + + if (kiov->kiov_offset != 0 || + ((resid > PAGE_SIZE) && + kiov->kiov_len < PAGE_SIZE)) { + int i; + /* Can't have gaps */ + CERROR("Can't make payload contiguous in I/O VM:" + "page %d, offset %d, len %d \n", nphys, + kiov->kiov_offset, kiov->kiov_len); + + for (i = -nphys; i < nkiov; i++) { + CERROR("kiov[%d] %p +%d for %d\n", + i, kiov[i].kiov_page, + kiov[i].kiov_offset, kiov[i].kiov_len); + } + + return -EINVAL; + } + + if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) { + CERROR ("payload too big (%d)\n", phys - tx->tx_phys); + return -EMSGSIZE; + } + + phys->Address = kranal_page2phys(kiov->kiov_page); + phys->Length = PAGE_SIZE; + phys++; + + resid -= PAGE_SIZE; + } + + tx->tx_phys_npages = phys - tx->tx_phys; + return 0; +} + +static inline int +kranal_setup_buffer (kra_tx_t *tx, int niov, + struct iovec *iov, ptl_kiov_t *kiov, + int offset, int nob) +{ + LASSERT ((iov == NULL) != (kiov == NULL)); + + if (kiov != NULL) + return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob); + + return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob); +} + +void +kranal_map_buffer (kra_tx_t *tx) +{ + kra_conn_t *conn = tx->tx_conn; + kra_device_t *dev = conn->rac_device; + + switch (tx->tx_buftype) { + default: + + case RANAL_BUF_PHYS_UNMAPPED: + rrc = RapkRegisterPhys(conn->rac_device->rad_handle, + tx->tx_phys, tx->tx_phys_npages, + conn->rac_device->rad_ptag, + &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + tx->tx_buftype = RANAL_BUF_PHYS_MAPPED; + return; + + case RANAL_BUF_VIRT_UNMAPPED: + rrc = RapkRegisterMemory(conn->rac_device->rad_handle, + tx->tx_buffer, tx->tx_nob, + conn->rac_device->rad_ptag, + &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + tx->tx_buftype = RANAL_BUF_VIRT_MAPPED; + return; + } +} + +kra_conn_t * +kranal_find_conn_locked (kra_peer_t *peer) +{ + struct list_head *tmp; + + /* just return the first connection */ + list_for_each (tmp, &peer->rap_conns) { + return list_entry(tmp, kra_conn_t, rac_list); + } + + return NULL; +} + +void +kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx) +{ + unsigned long flags; + + tx->tx_conn = conn; + + spin_lock_irqsave(&conn->rac_lock, flags); + list_add_tail(&tx->tx_list, &conn->rac_fmaq); + tx->tx_qtime = jiffies; + spin_unlock_irqrestore(&conn->rac_lock, flags); + + kranal_schedule_conn(conn); +} + +void +kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) +{ + unsigned long flags; + kra_peer_t *peer; + kra_conn_t *conn; + unsigned long now; + rwlock_t *g_lock = &kranal_data.kra_global_lock; + + /* If I get here, I've committed to send, so I complete the tx with + * failure on any problems */ + + LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ + + read_lock(g_lock); + + peer = kranal_find_peer_locked(nid); + if (peer == NULL) { + read_unlock(g_lock); + kranal_tx_done(tx, -EHOSTUNREACH); + return; + } + + conn = kranal_find_conn_locked(peer); + if (conn != NULL) { + kranal_post_fma(conn, tx); + read_unlock(g_lock); + return; + } + + /* Making one or more connections; I'll need a write lock... */ + read_unlock(g_lock); + write_lock_irqsave(g_lock, flags); + + peer = kranal_find_peer_locked(nid); + if (peer == NULL) { + write_unlock_irqrestore(g_lock, flags); + kranal_tx_done(tx -EHOSTUNREACH); + return; + } + + conn = kranal_find_conn_locked(peer); + if (conn != NULL) { + /* Connection exists; queue message on it */ + kranal_post_fma(conn, tx); + write_unlock_irqrestore(g_lock, flags); + return; + } + + LASSERT (peer->rap_persistence > 0); + + if (!peer->rap_connecting) { + now = CURRENT_TIME; + if (now < peer->rap_reconnect_time) { + write_unlock_irqrestore(g_lock, flags); + kranal_tx_done(tx, -EHOSTUNREACH); + return; + } + + peer->rap_connecting = 1; + kranal_peer_addref(peer); /* extra ref for connd */ + + spin_lock(&kranal_data.kra_connd_lock); + + list_add_tail(&peer->rap_connd_list, + &kranal_data.kra_connd_peers); + wake_up(&kranal_data.kra_connd_waitq); + + spin_unlock(&kranal_data.kra_connd_lock); + } + + /* A connection is being established; queue the message... */ + list_add_tail(&tx->tx_list, &peer->rap_tx_queue); + + write_unlock_irqrestore(g_lock, flags); +} + +static void +kranal_rdma(kra_tx_t *tx, int type, + kra_rdma_desc_t *rard, int nob, __u64 cookie) +{ + kra_conn_t *conn = tx->tx_conn; + RAP_RETURN rrc; + + /* prep final completion message */ + kranal_init_msg(&tx->tx_msg, type); + tx->tx_msg.ram_u.completion.racm_cookie = cookie; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + LASSERT (nob <= rard->rard_nob); + + memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); + tx->tx_rdma_desc.SrcPtr = tx->tx_buffer; + tx->tx_rdma_desc.SrcKey = tx->tx_map_key; + tx->tx_rdma_desc.DstPtr = rard->rard_addr; + tx->tx_rdma_desc.DstKey = rard->rard_key; + tx->tx_rdma_desc.Length = nob; + tx->tx_rdma_desc.AppPtr = tx; + + if (nob == 0) { /* Immediate completion */ + kranal_post_fma(conn, tx); + return; + } + + rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc); + LASSERT (rrc == RAP_SUCCESS); + + spin_lock_irqsave(&conn->rac_lock, flags); + list_add_tail(&tx->tx_list, &conn->rac_rdmaq); + tx->tx_qtime = jiffies; + spin_unlock_irqrestore(&conn->rac_lock, flags); +} + +int +kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) +{ + __u32 nob_received = nob; + RAP_RETURN rrc; + + LASSERT (conn->rac_rxmsg != NULL); + + rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer, + &nob_received, sizeof(kra_msg_t)); + LASSERT (rrc == RAP_SUCCESS); + + conn->rac_rxmsg = NULL; + + if (nob_received != nob) { + CWARN("Expected %d immediate bytes but got %d\n", + nob, nob_received); + return -EPROTO; + } + + return 0; +} + +ptl_err_t +kranal_do_send (lib_nal_t *nal, + void *private, + lib_msg_t *libmsg, + ptl_hdr_t *hdr, + int type, + ptl_nid_t nid, + ptl_pid_t pid, + unsigned int niov, + struct iovec *iov, + ptl_kiov_t *kiov, + size_t offset, + size_t nob) +{ + kra_conn_t *conn; + kra_tx_t *tx; + + /* NB 'private' is different depending on what we're sending.... */ + + CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64 + " pid %d\n", nob, niov, nid , pid); + + LASSERT (nob == 0 || niov > 0); + LASSERT (niov <= PTL_MD_MAX_IOV); + + LASSERT (!in_interrupt()); + /* payload is either all vaddrs or all pages */ + LASSERT (!(kiov != NULL && iov != NULL)); + + switch(type) { + default: + LBUG(); + + case PTL_MSG_REPLY: { + /* reply's 'private' is the conn that received the GET_REQ */ + conn = private; + LASSERT (conn->rac_rxmsg != NULL); + + if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) { + if (nob > RANAL_MAX_IMMEDIATE) { + CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n", + nob, nid); + return PTL_FAIL; + } + break; /* RDMA not expected */ + } + + /* Incoming message consistent with immediate reply? */ + if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) { + CERROR("REPLY to "LPX64" bad msg type %x!!!\n", + nid, conn->rac_rxmsg->ram_type); + return PTL_FAIL; + } + + tx = kranal_get_idle_tx(0); + if (tx == NULL) + return PTL_FAIL; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_conn = conn; + tx->tx_libmsg[0] = libmsg; + + kranal_map_buffer(tx); + kranal_rdma(tx, RANAL_MSG_GET_DONE, + &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob, + &conn->rac_rxmsg->ram_u.getreq.ragm_cookie); + return PTL_OK; + } + + case PTL_MSG_GET: + if (kiov == NULL && /* not paged */ + nob <= RANAL_MAX_IMMEDIATE && /* small enough */ + nob <= kranal_tunables.kra_max_immediate) + break; /* send IMMEDIATE */ + + tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg); + if (tx->tx_libmsg[1] == NULL) { + CERROR("Can't create reply for GET to "LPX64"\n", nid); + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_libmsg[0] = libmsg; + tx->tx_msg.ram_u.get.ragm_hdr = *hdr; + /* rest of tx_msg is setup just before it is sent */ + kranal_launch_tx(tx, nid); + return PTL_OK + + case PTL_MSG_ACK: + LASSERT (nob == 0); + break; + + case PTL_MSG_PUT: + if (kiov == NULL && /* not paged */ + nob <= RANAL_MAX_IMMEDIATE && /* small enough */ + nob <= kranal_tunables.kra_max_immediate) + break; /* send IMMEDIATE */ + + tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_libmsg[0] = libmsg; + tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr; + /* rest of tx_msg is setup just before it is sent */ + kranal_launch_tx(tx, nid); + return PTL_OK; + } + + LASSERT (kiov == NULL); + LASSERT (nob <= RANAL_MAX_IMMEDIATE); + + tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK || + type == PTL_MSG_REPLY || + in_interrupt()), + RANAL_MSG_IMMEDIATE); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + tx->tx_msg.ram_u.immediate.raim_hdr = *hdr; + tx->tx_libmsg[0] = libmsg; + kranal_launch_tx(tx, nid); + return PTL_OK; +} + +ptl_err_t +kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, struct iovec *iov, + size_t offset, size_t len) +{ + return kranal_do_send(nal, private, cookie, + hdr, type, nid, pid, + niov, iov, NULL, + offset, len); +} + +ptl_err_t +kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t len) +{ + return kranal_do_send(nal, private, cookie, + hdr, type, nid, pid, + niov, NULL, kiov, + offset, len); +} + +ptl_err_t +kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg, + unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) +{ + kra_conn_t *conn = private; + kra_msg_t *rxmsg = conn->rac_rxmsg; + void *buffer; + int rc; + + LASSERT (mlen <= rlen); + LASSERT (!in_interrupt()); + /* Either all pages or all vaddrs */ + LASSERT (!(kiov != NULL && iov != NULL)); + + switch(rxmsg->ram_type) { + default: + LBUG(); + return PTL_FAIL; + + case RANAL_MSG_IMMEDIATE: + if (mlen == 0) { + buffer = NULL; + } else if (kiov != NULL) { + CERROR("Can't recv immediate into paged buffer\n"); + return PTL_FAIL; + } else { + LASSERT (niov > 0); + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + if (mlen > iov->iov_len - offset) { + CERROR("Can't handle immediate frags\n"); + return PTL_FAIL; + } + buffer = ((char *)iov->iov_base) + offset; + } + rc = kranal_consume_rxmsg(conn, buffer, mlen); + lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL); + return PTL_OK; + + case RANAL_MSG_GET_REQ: + /* If the GET matched, we've already handled it in + * kranal_do_send which is called to send the REPLY. We're + * only called here to complete the GET receive (if we needed + * it which we don't, but I digress...) */ + LASSERT (libmsg == NULL); + lib_finalize(nal, NULL, libmsg, PTL_OK); + return PTL_OK; + + case RANAL_MSG_PUT_REQ: + if (libmsg == NULL) { /* PUT didn't match... */ + lib_finalize(null, NULL, libmsg, PTL_OK); + return PTL_OK; + } + + tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK); + if (tx == NULL) + return PTL_NO_SPACE; + + rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen); + if (rc != 0) { + kranal_tx_done(tx, rc); + return PTL_FAIL; + } + + kranal_map_buffer(tx); + + tx->tx_msg.ram_u.putack.rapam_src_cookie = + conn->rac_rxmsg->ram_u.putreq.raprm_cookie; + tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie; + tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key; + tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer; + tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen; + + tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */ + + kranal_post_fma(conn, tx); + + /* flag matched by consuming rx message */ + kranal_consume_rxmsg(conn, NULL, 0); + return PTL_OK; + } +} + +ptl_err_t +kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) +{ + return kranal_recvmsg(nal, private, msg, niov, iov, NULL, + offset, mlen, rlen); +} + +ptl_err_t +kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg, + unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) +{ + return kranal_recvmsg(nal, private, msg, niov, NULL, kiov, + offset, mlen, rlen); +} + +int +kranal_thread_start (int(*fn)(void *arg), void *arg) +{ + long pid = kernel_thread(fn, arg, 0); + + if (pid < 0) + return(int)pid; + + atomic_inc(&kranal_data.kra_nthreads); + return 0; +} + +void +kranal_thread_fini (void) +{ + atomic_dec(&kranal_data.kra_nthreads); +} + +int +kranal_check_conn (kra_conn_t *conn) +{ + kra_tx_t *tx; + struct list_head *ttmp; + unsigned long flags; + long timeout; + unsigned long now = jiffies; + + if (!conn->rac_closing && + time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) { + /* not sent in a while; schedule conn so scheduler sends a keepalive */ + kranal_schedule_conn(conn); + } + + /* wait twice as long for CLOSE to be sure peer is dead */ + timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ; + + if (!conn->rac_close_recvd && + time_after_eq(now, conn->rac_last_rx + timeout)) { + CERROR("Nothing received from "LPX64" within %d seconds\n", + conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ); + return -ETIMEDOUT; + } + + if (conn->rac_closing) + return 0; + + /* Check the conn's queues are moving. These are "belt+braces" checks, + * in case of hardware/software errors that make this conn seem + * responsive even though it isn't progressing its message queues. */ + + spin_lock_irqsave(&conn->rac_lock, flags); + + list_for_each (ttmp, &conn->rac_fmaq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (time_after_eq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on fmaq for "LPX64" blocked %d seconds\n", + conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + list_for_each (ttmp, &conn->rac_rdmaq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (time_after_eq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n", + conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + list_for_each (ttmp, &conn->rac_replyq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (time_after_eq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on replyq for "LPX64" blocked %d seconds\n", + conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + spin_unlock_irqrestore(&conn->rac_lock, flags); + return 0; +} + +void +kranal_check_conns (int idx, unsigned long *min_timeoutp) +{ + struct list_head *conns = &kranal_data.kra_conns[idx]; + struct list_head *ctmp; + kra_conn_t *conn; + + again: + /* NB. We expect to check all the conns and not find any problems, so + * we just use a shared lock while we take a look... */ + read_lock(&kranal_data.kra_global_lock); + + list_for_each (ctmp, conns) { + conn = list_entry(ptmp, kra_conn_t, rac_hashlist); + + if (conn->rac_timeout < *min_timeoutp ) + *min_timeoutp = conn->rac_timeout; + if (conn->rac_keepalive < *min_timeoutp ) + *min_timeoutp = conn->rac_keepalive; + + rc = kranal_check_conn(conn); + if (rc == 0) + continue; + + kranal_conn_addref(conn); + read_unlock(&kranal_data.kra_global_lock); + + CERROR("Check on conn to "LPX64"failed: %d\n", + conn->rac_peer->rap_nid, rc); + + write_lock_irqsave(&kranal_data.kra_global_lock); + + if (!conn->rac_closing) + kranal_close_conn_locked(conn, -ETIMEDOUT); + else + kranal_terminate_conn_locked(conn); + + kranal_conn_decref(conn); + + /* start again now I've dropped the lock */ + goto again; + } + + read_unlock(&kranal_data.kra_global_lock); +} + +int +kranal_connd (void *arg) +{ + char name[16]; + wait_queue_t wait; + unsigned long flags; + kra_peer_t *peer; + int i; + + snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + + while (!kranal_data.kra_shutdown) { + /* Safe: kra_shutdown only set when quiescent */ + + if (!list_empty(&kranal_data.kra_connd_peers)) { + peer = list_entry(kranal_data.kra_connd_peers.next, + kra_peer_t, rap_connd_list); + + list_del_init(&peer->rap_connd_list); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + kranal_connect(peer); + kranal_put_peer(peer); + + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + continue; + } + + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&kranal_data.kra_connd_waitq, &wait); + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + schedule (); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_connd_waitq, &wait); + + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + } + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + kranal_thread_fini(); + return 0; +} + +void +kranal_update_reaper_timeout(long timeout) +{ + unsigned long flags; + + LASSERT (timeout > 0); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + if (timeout < kranal_data.kra_new_min_timeout) + kranal_data.kra_new_min_timeout = timeout; + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); +} + +int +kranal_reaper (void *arg) +{ + wait_queue_t wait; + unsigned long flags; + kra_conn_t *conn; + kra_peer_t *peer; + unsigned long flags; + long timeout; + int i; + int conn_entries = kranal_data.kra_conn_hash_size; + int conn_index = 0; + int base_index = conn_entries - 1; + unsigned long next_check_time = jiffies; + long next_min_timeout = MAX_SCHEDULE_TIMEOUT; + long current_min_timeout = 1; + + kportal_daemonize("kranal_reaper"); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + kranal_data.kra_new_min_timeout = 1; + + while (!kranal_data.kra_shutdown) { + + /* careful with the jiffy wrap... */ + timeout = (long)(next_check_time - jiffies); + if (timeout <= 0) { + + /* I wake up every 'p' seconds to check for + * timeouts on some more peers. I try to check + * every connection 'n' times within the global + * minimum of all keepalive and timeout intervals, + * to ensure I attend to every connection within + * (n+1)/n times its timeout intervals. */ + + const int p = 1; + const int n = 3; + unsigned long min_timeout; + int chunk; + + if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { + /* new min timeout set: restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + + if (kranal_data.kra_new_min_timeout < current_min_timeout) { + current_min_timeout = kranal_data.kra_new_min_timeout; + CWARN("Set new min timeout %ld\n", + current_min_timeout); + } + + kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; + } + min_timeout = current_min_timeout; + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, + flags); + + LASSERT (min_timeout > 0); + + /* Compute how many table entries to check now so I + * get round the whole table fast enough (NB I do + * this at fixed intervals of 'p' seconds) */ + chunk = conn_entries; + if (min_timeout > n * p) + chunk = (chunk * n * p) / min_timeout; + if (chunk == 0) + chunk = 1; + + for (i = 0; i < chunk; i++) { + kranal_check_conns(conn_index, + &next_min_timeout); + conn_index = (conn_index + 1) % conn_entries; + } + + next_check_time += p * HZ; + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + if (((conn_index - chunk <= base_index && + base_index < conn_index) || + (conn_index - conn_entries - chunk <= base_index && + base_index < conn_index - conn_entries))) { + + /* Scanned all conns: set current_min_timeout... */ + if (current_min_timeout != next_min_timeout) { + current_min_timeout = next_min_timeout; + CWARN("Set new min timeout %ld\n", + current_min_timeout); + } + + /* ...and restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + } + } + + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); + + busy_loops = 0; + schedule_timeout(timeout); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + } + + kranal_thread_fini(); + return 0; +} + +void +kranal_process_rdmaq (__u32 cqid) +{ + kra_conn_t *conn; + kra_tx_t *tx; + RAP_RETURN rrc; + unsigned long flags; + RAP_RDMA_DESCRIPTOR *desc; + + read_lock(&kranal_data.kra_global_lock); + + conn = kranal_cqid2conn_locked(cqid); + LASSERT (conn != NULL); + + rrc = RapkRdmaDone(conn->rac_rihandle, &desc); + LASSERT (rrc == RAP_SUCCESS); + + spin_lock_irqsave(&conn->rac_lock, flags); + + LASSERT (!list_empty(&conn->rac_rdmaq)); + tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list); + list_del(&tx->tx_list); + + LASSERT(desc->AppPtr == (void *)tx); + LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE || + desc->tx_msg.ram_type == RANAL_MSG_GET_DONE); + + list_add_tail(&tx->tx_list, &conn->rac_fmaq); + tx->tx_qtime = jiffies; + + spin_unlock_irqrestore(&conn->rac_lock, flags); + + /* Get conn's fmaq processed, now I've just put something there */ + kranal_schedule_conn(conn); + + read_unlock(&kranal_data.kra_global_lock); +} + +int +kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, + void *immediate, int immediatenob) +{ + int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0; + + LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN); + LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ? + immediatenob <= RANAL_FMA_MAX_DATA_LEN : + immediatenob == 0); + + msg->ram_incarnation = conn->rac_incarnation; + msg->ram_seq = conn->rac_tx_seq; + + if (sync) + rrc = RapkFmaSyncSend(conn->rac_device.rad_handle, + immediate, immediatenob, + msg, sizeof(*msg)); + else + rrc = RapkFmaSend(conn->rac_device.rad_handle, + immediate, immediatenob, + msg, sizeof(*msg)); + + switch (rrc) { + case RAP_SUCCESS: + conn->rac_last_tx = jiffies; + conn->rac_tx_seq++; + return 0; + + case RAP_NOT_DONE: + return -EAGAIN; + + default: + LBUG(); + } +} + +int +kranal_process_fmaq (kra_conn_t *conn) +{ + unsigned long flags; + int more_to_do; + kra_tx_t *tx; + int rc; + int expect_reply; + + /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is + * out of credits when I try to send right now... */ + + if (conn->rac_closing) { + + if (!list_empty(&conn->rac_rdmaq)) { + /* Can't send CLOSE yet; I'm still waiting for RDMAs I + * posted to finish */ + LASSERT (!conn->rac_close_sent); + kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); + kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + return 0; + } + + if (conn->rac_close_sent) + return 0; + + kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE); + rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + conn->rac_close_sent = (rc == 0); + return 0; + } + + spin_lock_irqsave(&conn->rac_lock, flags); + + if (list_empty(&conn->rac_fmaq)) { + + spin_unlock_irqrestore(&conn->rac_lock, flags); + + if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) { + kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); + kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + } + return 0; + } + + tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); + list_del(&tx->tx_list); + more_to_do = !list_empty(&conn->rac_fmaq); + + spin_unlock_irqrestore(&conn->rac_lock, flags); + + expect_reply = 0; + switch (tx->tx_msg.ram_type) { + default: + LBUG(); + + case RANAL_MSG_IMMEDIATE: + case RANAL_MSG_PUT_NAK: + case RANAL_MSG_PUT_DONE: + case RANAL_MSG_GET_NAK: + case RANAL_MSG_GET_DONE: + rc = kranal_sendmsg(conn, &tx->tx_msg, + tx->tx_buffer, tx->tx_nob); + expect_reply = 0; + break; + + case RANAL_MSG_PUT_REQ: + tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie; + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); + kranal_map_buffer(tx); + expect_reply = 1; + break; + + case RANAL_MSG_PUT_ACK: + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); + expect_reply = 1; + break; + + case RANAL_MSG_GET_REQ: + kranal_map_buffer(tx); + tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie; + tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key; + tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer; + tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob; + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); + expect_reply = 1; + break; + } + + if (rc == -EAGAIN) { + /* replace at the head of the list for later */ + spin_lock_irqsave(&conn->rac_lock, flags); + list_add(&tx->tx_list, &conn->rac_fmaq); + spin_unlock_irqrestore(&conn->rac_lock, flags); + + return 0; + } + + LASSERT (rc == 0); + + if (!expect_reply) { + kranal_tx_done(tx, 0); + } else { + spin_lock_irqsave(&conn->rac_lock, flags); + list_add_tail(&tx->tx_list, &conn->rac_replyq); + tx->tx_qtime = jiffies; + spin_unlock_irqrestore(&conn->rac_lock, flags); + } + + return more_to_do; +} + +static inline void +kranal_swab_rdma_desc (kra_rdma_desc_t *d) +{ + __swab64s(&d->rard_key.Key); + __swab16s(&d->rard_key.Cookie); + __swab16s(&d->rard_key.MdHandle); + __swab32s(&d->rard_key.Flags); + __swab64s(&d->rard_addr); + __swab32s(&d->rard_nob); +} + +kra_tx_t * +kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie) +{ + unsigned long flags; + struct list_head *ttmp; + kra_tx_t *tx; + + list_for_each(ttmp, &conn->rac_replyq) { + tx = list_entry(ttmp, kra_tx_t, tx_list); + + if (tx->tx_cookie != cookie) + continue; + + if (tx->tx_msg.ram_type != type) { + CWARN("Unexpected type %x (%x expected) " + "matched reply from "LPX64"\n", + tx->tx_msg.ram_type, type, + conn->rac_peer->rap_nid); + return NULL; + } + } + + CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid); + return NULL; +} + +int +kranal_process_receives(kra_conn_t *conn) +{ + unsigned long flags; + __u32 seq; + __u32 nob; + kra_msg_t *msg; + RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg); + kra_peer_t *peer = conn->rac_peer; + + if (rrc == RAP_NOT_DONE) + return 0; + + LASSERT (rrc == RAP_SUCCESS); + conn->rac_last_rx = jiffies; + seq = conn->rac_seq++; + + if (msg->ram_magic != RANAL_MSG_MAGIC) { + if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) { + CERROR("Unexpected magic %08x from "LPX64"\n", + msg->ram_magic, peer->rap_nid); + goto out; + } + + __swab32s(&msg->ram_magic); + __swab16s(&msg->ram_version); + __swab16s(&msg->ram_type); + __swab64s(&msg->ram_srcnid); + __swab64s(&msg->ram_incarnation); + __swab32s(&msg->ram_seq); + + /* NB message type checked below; NOT here... */ + switch (msg->ram_type) { + case RANAL_MSG_PUT_ACK: + kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc); + break; + + case RANAL_MSG_GET_REQ: + kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc); + break; + + default: + break; + } + } + + if (msg->ram_version != RANAL_MSG_VERSION) { + CERROR("Unexpected protocol version %d from "LPX64"\n", + msg->ram_version, peer->rap_nid); + goto out; + } + + if (msg->ram_srcnid != peer->rap_nid) { + CERROR("Unexpected peer "LPX64" from "LPX64"\n", + msg->ram_srcnid, peer->rap_nid); + goto out; + } + + if (msg->ram_incarnation != conn->rac_incarnation) { + CERROR("Unexpected incarnation "LPX64"("LPX64 + " expected) from "LPX64"\n", + msg->ram_incarnation, conn->rac_incarnation, + peer->rap_nid); + goto out; + } + + if (msg->ram_seq != seq) { + CERROR("Unexpected sequence number %d(%d expected) from " + LPX64"\n", msg->ram_seq, seq, peer->rap_nid); + goto out; + } + + if ((msg->ram_type & RANAL_MSG_FENCE) != 0) { + /* This message signals RDMA completion: wait now... */ + rrc = RapkFmaSyncWait(conn->rac_rihandle); + LASSERT (rrc == RAP_SUCCESS); + } + + if (msg->ram_type == RANAL_MSG_CLOSE) { + conn->rac_close_recvd = 1; + write_lock_irqsave(&kranal_data.kra_global_lock); + + if (!conn->rac_closing) + kranal_close_conn_locked(conn, -ETIMEDOUT); + else if (conn->rac_close_sent) + kranal_terminate_conn_locked(conn); + + goto out; + } + + if (conn->rac_closing) + goto out; + + conn->rac_rxmsg = msg; /* stash message for portals callbacks */ + /* they'll NULL rac_rxmsg if they consume it */ + switch (msg->ram_type) { + case RANAL_MSG_NOOP: + /* Nothing to do; just a keepalive */ + break; + + case RANAL_MSG_IMMEDIATE: + lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn); + break; + + case RANAL_MSG_PUT_REQ: + lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn); + + if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ + break; + + tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK); + if (tx == NULL) + break; + + tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie; + kranal_post_fma(conn, tx); + break; + + case RANAL_MSG_PUT_NAK: + tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, -ENOENT); /* no match */ + break; + + case RANAL_MSG_PUT_ACK: + tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, + msg->ram_u.putack.rapam_src_cookie); + if (tx == NULL) + break; + + kranal_rdma(tx, RANAL_MSG_PUT_DONE, + &msg->ram_u.putack.rapam_desc, + msg->msg_u.putack.rapam_desc.rard_nob, + msg->ram_u.putack.rapam_dst_cookie); + break; + + case RANAL_MSG_PUT_DONE: + tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, 0); + break; + + case RANAL_MSG_GET_REQ: + lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn); + + if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ + break; + + tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK); + if (tx == NULL) + break; + + tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie; + kranal_post_fma(conn, tx); + break; + + case RANAL_MSG_GET_NAK: + tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, -ENOENT); /* no match */ + break; + + case RANAL_MSG_GET_DONE: + tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, + msg->ram_u.completion.racm_cookie); + if (tx == NULL) + break; + + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || + tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); + kranal_tx_done(tx, 0); + break; + } + + out: + if (conn->rac_msg != NULL) + kranal_consume_rxmsg(conn, NULL, 0); + + return 1; +} + +int +kranal_scheduler (void *arg) +{ + kra_device_t *dev = (kra_device_t *)arg; + wait_queue_t wait; + char name[16]; + kra_conn_t *conn; + unsigned long flags; + int rc; + int i; + __u32 cqid; + int did_something; + int busy_loops = 0; + + snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + spin_lock_irqsave(&dev->rad_lock, flags); + + while (!kranal_data.kra_shutdown) { + /* Safe: kra_shutdown only set when quiescent */ + + if (busy_loops++ >= RANAL_RESCHED) { + spin_unlock_irqrestore(&dev->rad_lock, flags); + + our_cond_resched(); + busy_loops = 0; + + spin_lock_irqsave(&dev->rad_lock, flags); + } + + did_something = 0; + + if (dev->rad_ready) { + dev->rad_ready = 0; + spin_unlock_irqrestore(&dev->rad_lock, flags); + + rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type); + + LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE); + LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); + + if (rrc == RAP_SUCCESS) { + kranal_process_rdmaq(cqid); + did_something = 1; + } + + rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type); + LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE); + + if (rrc == RAP_SUCCESS) { + if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0) + kranal_schedule_dev(dev); + else + kranal_schedule_cqid(cqid); + did_something = 1; + } + + spin_lock_irqsave(&dev->rad_lock, flags); + + /* If there were no completions to handle, I leave + * rad_ready clear. NB I cleared it BEFORE I checked + * the completion queues since I'm racing with the + * device callback. */ + + if (did_something) + dev->rad_ready = 1; + } + + if (!list_empty(&dev->rad_connq)) { + conn = list_entry(dev->rad_connq.next, + kra_conn_t, rac_schedlist); + list_del(&conn->rac_schedlist); + spin_unlock_irqrestore(&dev->rad_lock, flags); + + LASSERT (conn->rac_scheduled); + + resched = kranal_process_fmaq(conn); + resched |= kranal_process_receives(conn); + did_something = 1; + + spin_lock_irqsave(&dev->rad_lock, flags); + if (resched) + list_add_tail(&conn->rac_schedlist, + &dev->rad_connq); + } + + if (did_something) + continue; + + add_wait_queue(&dev->rad_waitq, &wait); + set_current_state(TASK_INTERRUPTIBLE); + + spin_unlock_irqrestore(&dev->rad_lock, flags); + + busy_loops = 0; + schedule(); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&dev->rad_waitq, &wait); + + spin_lock_irqsave(&dev->rad_lock, flags); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); + + kranal_thread_fini(); + return 0; +} + + +lib_nal_t kranal_lib = { + libnal_data: &kranal_data, /* NAL private data */ + libnal_send: kranal_send, + libnal_send_pages: kranal_send_pages, + libnal_recv: kranal_recv, + libnal_recv_pages: kranal_recv_pages, + libnal_dist: kranal_dist +}; -- 1.8.3.1