*/
#include "ranal.h"
+static int kranal_devids[] = {RAPK_MAIN_DEVICE_ID,
+ RAPK_EXPANSION_DEVICE_ID};
nal_t kranal_api;
ptl_handle_ni_t kranal_ni;
kra_data_t kranal_data;
kra_tunables_t kranal_tunables;
-#ifdef CONFIG_SYSCTL
#define RANAL_SYSCTL_TIMEOUT 1
#define RANAL_SYSCTL_LISTENER_TIMEOUT 2
#define RANAL_SYSCTL_BACKLOG 3
#define RANAL_SYSCTL 202
static ctl_table kranal_ctl_table[] = {
- {RANAL_SYSCTL_TIMEOUT, "timeout",
+ {RANAL_SYSCTL_TIMEOUT, "timeout",
&kranal_tunables.kra_timeout, sizeof(int),
0644, NULL, &proc_dointvec},
- {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout",
+ {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout",
&kranal_tunables.kra_listener_timeout, sizeof(int),
0644, NULL, &proc_dointvec},
- {RANAL_SYSCTL_BACKLOG, "backlog",
- &kranal_tunables.kra_backlog, sizeof(int),
- 0644, NULL, kranal_listener_procint},
- {RANAL_SYSCTL_PORT, "port",
- &kranal_tunables.kra_port, sizeof(int),
- 0644, NULL, kranal_listener_procint},
- {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate",
+ {RANAL_SYSCTL_BACKLOG, "backlog",
+ &kranal_tunables.kra_backlog, sizeof(int),
+ 0644, NULL, kranal_listener_procint},
+ {RANAL_SYSCTL_PORT, "port",
+ &kranal_tunables.kra_port, sizeof(int),
+ 0644, NULL, kranal_listener_procint},
+ {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate",
&kranal_tunables.kra_max_immediate, sizeof(int),
0644, NULL, &proc_dointvec},
{ 0 }
{RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table},
{ 0 }
};
-#endif
int
kranal_sock_write (struct socket *sock, void *buffer, int nob)
{
int rc;
mm_segment_t oldmm = get_fs();
- struct iovec iov = {
- .iov_base = buffer,
- .iov_len = nob
- };
- struct msghdr msg = {
- .msg_name = NULL,
- .msg_namelen = 0,
- .msg_iov = &iov,
- .msg_iovlen = 1,
- .msg_control = NULL,
- .msg_controllen = 0,
- .msg_flags = MSG_DONTWAIT
- };
-
- /* We've set up the socket's send buffer to be large enough for
- * everything we send, so a single non-blocking send should
- * complete without error. */
-
- set_fs(KERNEL_DS);
- rc = sock_sendmsg(sock, &msg, iov.iov_len);
- set_fs(oldmm);
-
- return rc;
+ struct iovec iov = {
+ .iov_base = buffer,
+ .iov_len = nob
+ };
+ struct msghdr msg = {
+ .msg_name = NULL,
+ .msg_namelen = 0,
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ .msg_control = NULL,
+ .msg_controllen = 0,
+ .msg_flags = MSG_DONTWAIT
+ };
+
+ /* We've set up the socket's send buffer to be large enough for
+ * everything we send, so a single non-blocking send should
+ * complete without error. */
+
+ set_fs(KERNEL_DS);
+ rc = sock_sendmsg(sock, &msg, iov.iov_len);
+ set_fs(oldmm);
+
+ if (rc == nob)
+ return 0;
+
+ if (rc >= 0)
+ return -EAGAIN;
+
+ return rc;
}
int
{
int rc;
mm_segment_t oldmm = get_fs();
- long ticks = timeout * HZ;
- unsigned long then;
- struct timeval tv;
+ long ticks = timeout * HZ;
+ unsigned long then;
+ struct timeval tv;
- LASSERT (nob > 0);
- LASSERT (ticks > 0);
+ LASSERT (nob > 0);
+ LASSERT (ticks > 0);
for (;;) {
struct iovec iov = {
.msg_flags = 0
};
- /* Set receive timeout to remaining time */
- tv = (struct timeval) {
- .tv_sec = ticks / HZ,
- .tv_usec = ((ticks % HZ) * 1000000) / HZ
- };
- set_fs(KERNEL_DS);
- rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
- (char *)&tv, sizeof(tv));
- set_fs(oldmm);
- if (rc != 0) {
- CERROR("Can't set socket recv timeout %d: %d\n",
- timeout, rc);
- return rc;
- }
+ /* Set receive timeout to remaining time */
+ tv = (struct timeval) {
+ .tv_sec = ticks / HZ,
+ .tv_usec = ((ticks % HZ) * 1000000) / HZ
+ };
+ set_fs(KERNEL_DS);
+ rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
+ (char *)&tv, sizeof(tv));
+ set_fs(oldmm);
+ if (rc != 0) {
+ CERROR("Can't set socket recv timeout %d: %d\n",
+ timeout, rc);
+ return rc;
+ }
set_fs(KERNEL_DS);
- then = jiffies;
+ then = jiffies;
rc = sock_recvmsg(sock, &msg, iov.iov_len, 0);
- ticks -= jiffies - then;
+ ticks -= jiffies - then;
set_fs(oldmm);
if (rc < 0)
buffer = ((char *)buffer) + rc;
nob -= rc;
- if (nob == 0)
- return 0;
+ if (nob == 0)
+ return 0;
- if (ticks <= 0)
- return -ETIMEDOUT;
+ if (ticks <= 0)
+ return -ETIMEDOUT;
}
}
int
kranal_create_sock(struct socket **sockp)
{
- struct socket *sock;
- int rc;
- struct timeval tv;
- int option;
+ struct socket *sock;
+ int rc;
+ int option;
mm_segment_t oldmm = get_fs();
- rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock);
- if (rc != 0) {
- CERROR("Can't create socket: %d\n", rc);
- return rc;
- }
-
- /* Ensure sending connection info doesn't block */
- option = 2 * sizeof(kra_connreq_t);
- set_fs(KERNEL_DS);
- rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
- (char *)&option, sizeof(option));
- set_fs(oldmm);
- if (rc != 0) {
- CERROR("Can't set send buffer %d: %d\n", option, rc);
- goto failed;
- }
-
- option = 1;
- set_fs(KERNEL_DS);
- rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
- (char *)&option, sizeof(option));
- set_fs(oldmm);
- if (rc != 0) {
- CERROR("Can't set SO_REUSEADDR: %d\n", rc);
- goto failed;
- }
-
- *sockp = sock;
- return 0;
+ rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock);
+ if (rc != 0) {
+ CERROR("Can't create socket: %d\n", rc);
+ return rc;
+ }
+
+ /* Ensure sending connection info doesn't block */
+ option = 2 * sizeof(kra_connreq_t);
+ set_fs(KERNEL_DS);
+ rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
+ (char *)&option, sizeof(option));
+ set_fs(oldmm);
+ if (rc != 0) {
+ CERROR("Can't set send buffer %d: %d\n", option, rc);
+ goto failed;
+ }
+
+ option = 1;
+ set_fs(KERNEL_DS);
+ rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&option, sizeof(option));
+ set_fs(oldmm);
+ if (rc != 0) {
+ CERROR("Can't set SO_REUSEADDR: %d\n", rc);
+ goto failed;
+ }
+
+ *sockp = sock;
+ return 0;
failed:
- sock_release(sock);
- return rc;
+ sock_release(sock);
+ return rc;
}
void
kranal_pause(int ticks)
{
- set_current_state(TASK_UNINTERRUPTIBLE);
- schedule_timeout(ticks);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ schedule_timeout(ticks);
}
void
-kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
+kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, ptl_nid_t dstnid)
{
RAP_RETURN rrc;
memset(connreq, 0, sizeof(*connreq));
- connreq->racr_magic = RANAL_MSG_MAGIC;
- connreq->racr_version = RANAL_MSG_VERSION;
- connreq->racr_devid = conn->rac_device->rad_id;
- connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid;
- connreq->racr_timeout = conn->rac_timeout;
- connreq->racr_incarnation = conn->rac_my_incarnation;
+ connreq->racr_magic = RANAL_MSG_MAGIC;
+ connreq->racr_version = RANAL_MSG_VERSION;
+ connreq->racr_devid = conn->rac_device->rad_id;
+ connreq->racr_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
+ connreq->racr_dstnid = dstnid;
+ connreq->racr_peerstamp = kranal_data.kra_peerstamp;
+ connreq->racr_connstamp = conn->rac_my_connstamp;
+ connreq->racr_timeout = conn->rac_timeout;
rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
LASSERT(rrc == RAP_SUCCESS);
int
kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
{
- int i;
- int rc;
-
- rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
- if (rc != 0) {
- CERROR("Read failed: %d\n", rc);
- return rc;
- }
-
- if (connreq->racr_magic != RANAL_MSG_MAGIC) {
- if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) {
- CERROR("Unexpected magic %08x\n", connreq->racr_magic);
- return -EPROTO;
- }
-
- __swab32s(&connreq->racr_magic);
- __swab16s(&connreq->racr_version);
+ int rc;
+
+ rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
+ if (rc != 0) {
+ CERROR("Read failed: %d\n", rc);
+ return rc;
+ }
+
+ if (connreq->racr_magic != RANAL_MSG_MAGIC) {
+ if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) {
+ CERROR("Unexpected magic %08x\n", connreq->racr_magic);
+ return -EPROTO;
+ }
+
+ __swab32s(&connreq->racr_magic);
+ __swab16s(&connreq->racr_version);
__swab16s(&connreq->racr_devid);
- __swab64s(&connreq->racr_nid);
- __swab64s(&connreq->racr_incarnation);
+ __swab64s(&connreq->racr_srcnid);
+ __swab64s(&connreq->racr_dstnid);
+ __swab64s(&connreq->racr_peerstamp);
+ __swab64s(&connreq->racr_connstamp);
__swab32s(&connreq->racr_timeout);
- __swab32s(&connreq->racr_riparams.FmaDomainHndl);
- __swab32s(&connreq->racr_riparams.RcvCqHndl);
- __swab32s(&connreq->racr_riparams.PTag);
+ __swab32s(&connreq->racr_riparams.HostId);
+ __swab32s(&connreq->racr_riparams.FmaDomainHndl);
+ __swab32s(&connreq->racr_riparams.PTag);
__swab32s(&connreq->racr_riparams.CompletionCookie);
- }
+ }
- if (connreq->racr_version != RANAL_MSG_VERSION) {
- CERROR("Unexpected version %d\n", connreq->racr_version);
- return -EPROTO;
- }
+ if (connreq->racr_version != RANAL_MSG_VERSION) {
+ CERROR("Unexpected version %d\n", connreq->racr_version);
+ return -EPROTO;
+ }
- if (connreq->racr_nid == PTL_NID_ANY) {
+ if (connreq->racr_srcnid == PTL_NID_ANY ||
+ connreq->racr_dstnid == PTL_NID_ANY) {
CERROR("Received PTL_NID_ANY\n");
return -EPROTO;
}
connreq->racr_timeout, RANAL_MIN_TIMEOUT);
return -EPROTO;
}
-
- for (i = 0; i < kranal_data.kra_ndevs; i++)
- if (connreq->racr_devid ==
- kranal_data.kra_devices[i].rad_id)
- break;
- if (i == kranal_data.kra_ndevs) {
- CERROR("Can't match device %d\n", connreq->racr_devid);
- return -ENODEV;
+ return 0;
+}
+
+int
+kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
+{
+ kra_conn_t *conn;
+ struct list_head *ctmp;
+ struct list_head *cnxt;
+ int loopback;
+ int count = 0;
+
+ loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+
+ list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+ conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+ if (conn == newconn)
+ continue;
+
+ if (conn->rac_peerstamp != newconn->rac_peerstamp) {
+ CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+ " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid,
+ conn->rac_peerstamp, newconn->rac_peerstamp);
+ LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
+ count++;
+ kranal_close_conn_locked(conn, -ESTALE);
+ continue;
+ }
+
+ if (conn->rac_device != newconn->rac_device)
+ continue;
+
+ if (loopback &&
+ newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+ newconn->rac_peer_connstamp == conn->rac_my_connstamp)
+ continue;
+
+ LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
+
+ CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+ " connstamp:"LPX64"("LPX64")\n", peer->rap_nid,
+ conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
+
+ count++;
+ kranal_close_conn_locked(conn, -ESTALE);
}
- return 0;
+ return count;
}
int
-kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation)
+kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
{
- kra_conn_t *conn;
- struct list_head *tmp;
- int loopback = 0;
+ kra_conn_t *conn;
+ struct list_head *tmp;
+ int loopback;
+
+ loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+
+ list_for_each(tmp, &peer->rap_conns) {
+ conn = list_entry(tmp, kra_conn_t, rac_list);
- list_for_each(tmp, &peer->rap_conns) {
- conn = list_entry(tmp, kra_conn_t, rac_list);
+ /* 'newconn' is from an earlier version of 'peer'!!! */
+ if (newconn->rac_peerstamp < conn->rac_peerstamp)
+ return 1;
- if (conn->rac_peer_incarnation < incarnation) {
- /* Conns with an older incarnation get culled later */
+ /* 'conn' is from an earlier version of 'peer': it will be
+ * removed when we cull stale conns later on... */
+ if (newconn->rac_peerstamp > conn->rac_peerstamp)
continue;
- }
- if (!loopback &&
- conn->rac_peer_incarnation == incarnation &&
- peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
- /* loopback creates 2 conns */
- loopback = 1;
+ /* Different devices are OK */
+ if (conn->rac_device != newconn->rac_device)
+ continue;
+
+ /* It's me connecting to myself */
+ if (loopback &&
+ newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+ newconn->rac_peer_connstamp == conn->rac_my_connstamp)
+ continue;
+
+ /* 'newconn' is an earlier connection from 'peer'!!! */
+ if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
+ return 2;
+
+ /* 'conn' is an earlier connection from 'peer': it will be
+ * removed when we cull stale conns later on... */
+ if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
continue;
- }
- return 1;
- }
+ /* 'newconn' has the SAME connection stamp; 'peer' isn't
+ * playing the game... */
+ return 3;
+ }
- return 0;
+ return 0;
}
void
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
- conn->rac_my_incarnation = kranal_data.kra_next_incarnation++;
+ conn->rac_my_connstamp = kranal_data.kra_connstamp++;
do { /* allocate a unique cqid */
conn->rac_cqid = kranal_data.kra_next_cqid++;
} while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
-
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
}
int
-kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
+kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
{
- kra_conn_t *conn;
+ kra_conn_t *conn;
RAP_RETURN rrc;
LASSERT (!in_interrupt());
- PORTAL_ALLOC(conn, sizeof(*conn));
+ PORTAL_ALLOC(conn, sizeof(*conn));
- if (conn == NULL)
- return -ENOMEM;
+ if (conn == NULL)
+ return -ENOMEM;
- memset(conn, 0, sizeof(*conn));
- atomic_set(&conn->rac_refcount, 1);
- INIT_LIST_HEAD(&conn->rac_list);
- INIT_LIST_HEAD(&conn->rac_hashlist);
- INIT_LIST_HEAD(&conn->rac_fmaq);
- INIT_LIST_HEAD(&conn->rac_rdmaq);
- INIT_LIST_HEAD(&conn->rac_replyq);
- spin_lock_init(&conn->rac_lock);
+ memset(conn, 0, sizeof(*conn));
+ atomic_set(&conn->rac_refcount, 1);
+ INIT_LIST_HEAD(&conn->rac_list);
+ INIT_LIST_HEAD(&conn->rac_hashlist);
+ INIT_LIST_HEAD(&conn->rac_schedlist);
+ INIT_LIST_HEAD(&conn->rac_fmaq);
+ INIT_LIST_HEAD(&conn->rac_rdmaq);
+ INIT_LIST_HEAD(&conn->rac_replyq);
+ spin_lock_init(&conn->rac_lock);
kranal_set_conn_uniqueness(conn);
+ conn->rac_device = dev;
conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
kranal_update_reaper_timeout(conn->rac_timeout);
rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
- dev->rad_ptag,
- dev->rad_rdma_cq, dev->rad_fma_cq,
&conn->rac_rihandle);
if (rrc != RAP_SUCCESS) {
CERROR("RapkCreateRi failed: %d\n", rrc);
}
atomic_inc(&kranal_data.kra_nconns);
- *connp = conn;
- return 0;
+ *connp = conn;
+ return 0;
}
void
-__kranal_conn_decref(kra_conn_t *conn)
+kranal_destroy_conn(kra_conn_t *conn)
{
- kra_tx_t *tx;
RAP_RETURN rrc;
LASSERT (!in_interrupt());
LASSERT (!conn->rac_scheduled);
LASSERT (list_empty(&conn->rac_list));
LASSERT (list_empty(&conn->rac_hashlist));
+ LASSERT (list_empty(&conn->rac_schedlist));
LASSERT (atomic_read(&conn->rac_refcount) == 0);
-
- while (!list_empty(&conn->rac_fmaq)) {
- tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
-
- list_del(&tx->tx_list);
- kranal_tx_done(tx, -ECONNABORTED);
- }
-
- /* We may not destroy this connection while it has RDMAs outstanding */
+ LASSERT (list_empty(&conn->rac_fmaq));
LASSERT (list_empty(&conn->rac_rdmaq));
+ LASSERT (list_empty(&conn->rac_replyq));
- while (!list_empty(&conn->rac_replyq)) {
- tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
-
- list_del(&tx->tx_list);
- kranal_tx_done(tx, -ECONNABORTED);
- }
-
rrc = RapkDestroyRi(conn->rac_device->rad_handle,
conn->rac_rihandle);
LASSERT (rrc == RAP_SUCCESS);
if (conn->rac_peer != NULL)
kranal_peer_decref(conn->rac_peer);
- PORTAL_FREE(conn, sizeof(*conn));
+ PORTAL_FREE(conn, sizeof(*conn));
atomic_dec(&kranal_data.kra_nconns);
}
void
kranal_terminate_conn_locked (kra_conn_t *conn)
{
- kra_peer_t *peer = conn->rac_peer;
-
LASSERT (!in_interrupt());
- LASSERT (conn->rac_closing);
+ LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
LASSERT (!list_empty(&conn->rac_hashlist));
LASSERT (list_empty(&conn->rac_list));
- /* Remove from conn hash table (no new callbacks) */
+ /* Remove from conn hash table: no new callbacks */
list_del_init(&conn->rac_hashlist);
kranal_conn_decref(conn);
- /* Conn is now just waiting for remaining refs to go */
+ conn->rac_state = RANAL_CONN_CLOSED;
+
+ /* schedule to clear out all uncompleted comms in context of dev's
+ * scheduler */
+ kranal_schedule_conn(conn);
}
void
kra_peer_t *peer = conn->rac_peer;
CDEBUG(error == 0 ? D_NET : D_ERROR,
- "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
+ "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
LASSERT (!in_interrupt());
- LASSERT (!conn->rac_closing);
+ LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
LASSERT (!list_empty(&conn->rac_hashlist));
LASSERT (!list_empty(&conn->rac_list));
kranal_unlink_peer_locked(peer);
}
- conn->rac_closing = 1;
- kranal_schedule_conn(conn);
+ /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
+ * full timeout. If we get a CLOSE we know the peer has stopped all
+ * RDMA. Otherwise if we wait for the full timeout we can also be sure
+ * all RDMA has stopped. */
+ conn->rac_last_rx = jiffies;
+ mb();
+
+ conn->rac_state = RANAL_CONN_CLOSING;
+ kranal_schedule_conn(conn); /* schedule sending CLOSE */
kranal_conn_decref(conn); /* lose peer's ref */
}
kranal_close_conn (kra_conn_t *conn, int error)
{
unsigned long flags;
-
+
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
- if (!conn->rac_closing)
+
+ if (conn->rac_state == RANAL_CONN_ESTABLISHED)
kranal_close_conn_locked(conn, error);
-
+
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
}
int
-kranal_passive_conn_handshake (struct socket *sock,
- ptl_nid_t *peer_nidp, kra_conn_t **connp)
+kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
+ __u32 peer_ip, int peer_port)
+{
+ kra_device_t *dev = conn->rac_device;
+ unsigned long flags;
+ RAP_RETURN rrc;
+
+ /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
+ * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
+ conn->rac_last_tx = jiffies;
+ conn->rac_keepalive = 0;
+
+ rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+ if (rrc != RAP_SUCCESS) {
+ CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
+ HIPQUAD(peer_ip), peer_port, rrc);
+ return -ECONNABORTED;
+ }
+
+ /* Schedule conn on rad_new_conns */
+ kranal_conn_addref(conn);
+ spin_lock_irqsave(&dev->rad_lock, flags);
+ list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
+ wake_up(&dev->rad_waitq);
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+ rrc = RapkWaitToConnect(conn->rac_rihandle);
+ if (rrc != RAP_SUCCESS) {
+ CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",
+ HIPQUAD(peer_ip), peer_port, rrc);
+ return -ECONNABORTED;
+ }
+
+ /* Scheduler doesn't touch conn apart from to deschedule and decref it
+ * after RapkCompleteSync() return success, so conn is all mine */
+
+ conn->rac_peerstamp = connreq->racr_peerstamp;
+ conn->rac_peer_connstamp = connreq->racr_connstamp;
+ conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
+ kranal_update_reaper_timeout(conn->rac_keepalive);
+ return 0;
+}
+
+int
+kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp,
+ ptl_nid_t *dst_nidp, kra_conn_t **connp)
{
- struct sockaddr_in addr;
- __u32 peer_ip;
+ struct sockaddr_in addr;
+ __u32 peer_ip;
unsigned int peer_port;
- kra_connreq_t connreq;
- ptl_nid_t peer_nid;
+ kra_connreq_t rx_connreq;
+ kra_connreq_t tx_connreq;
kra_conn_t *conn;
kra_device_t *dev;
- RAP_RETURN rrc;
- int rc;
+ int rc;
int len;
int i;
len = sizeof(addr);
- rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
+ rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
if (rc != 0) {
CERROR("Can't get peer's IP: %d\n", rc);
return rc;
return -ECONNREFUSED;
}
- rc = kranal_recv_connreq(sock, &connreq,
+ rc = kranal_recv_connreq(sock, &rx_connreq,
kranal_tunables.kra_listener_timeout);
if (rc != 0) {
- CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
+ CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
HIPQUAD(peer_ip), peer_port, rc);
return rc;
}
- peer_nid = connreq.racr_nid;
- LASSERT (peer_nid != PTL_NID_ANY);
-
for (i = 0;;i++) {
- LASSERT(i < kranal_data.kra_ndevs);
+ if (i == kranal_data.kra_ndevs) {
+ CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
+ rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
+ return -ENODEV;
+ }
dev = &kranal_data.kra_devices[i];
- if (dev->rad_id == connreq.racr_devid)
+ if (dev->rad_id == rx_connreq.racr_devid)
break;
}
- rc = kranal_alloc_conn(&conn, dev);
+ rc = kranal_create_conn(&conn, dev);
if (rc != 0)
return rc;
- conn->rac_peer_incarnation = connreq.racr_incarnation;
- conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
- kranal_update_reaper_timeout(conn->rac_keepalive);
-
- rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
- if (rrc != RAP_SUCCESS) {
- CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
+ kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
+
+ rc = kranal_sock_write(sock, &tx_connreq, sizeof(tx_connreq));
+ if (rc != 0) {
+ CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
+ HIPQUAD(peer_ip), peer_port, rc);
kranal_conn_decref(conn);
- return -EPROTO;
+ return rc;
}
- kranal_pack_connreq(&connreq, conn);
-
- rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
+ rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
if (rc != 0) {
- CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
- HIPQUAD(peer_ip), peer_port, rc);
kranal_conn_decref(conn);
return rc;
}
*connp = conn;
- *peer_nidp = peer_nid;
+ *src_nidp = rx_connreq.racr_srcnid;
+ *dst_nidp = rx_connreq.racr_dstnid;
return 0;
}
struct socket *sock;
unsigned int port;
int rc;
- int option;
- mm_segment_t oldmm = get_fs();
- struct timeval tv;
for (port = 1023; port >= 512; port--) {
- memset(&locaddr, 0, sizeof(locaddr));
- locaddr.sin_family = AF_INET;
+ memset(&locaddr, 0, sizeof(locaddr));
+ locaddr.sin_family = AF_INET;
locaddr.sin_port = htons(port);
locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
(struct sockaddr *)&locaddr, sizeof(locaddr));
if (rc != 0) {
sock_release(sock);
-
+
if (rc == -EADDRINUSE) {
CDEBUG(D_NET, "Port %d already in use\n", port);
continue;
*sockp = sock;
return 0;
}
-
+
sock_release(sock);
if (rc != -EADDRNOTAVAIL) {
port, HIPQUAD(peer->rap_ip), peer->rap_port, rc);
return rc;
}
-
- CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n",
+
+ CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n",
port, HIPQUAD(peer->rap_ip), peer->rap_port);
}
int
-kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
+kranal_active_conn_handshake(kra_peer_t *peer,
+ ptl_nid_t *dst_nidp, kra_conn_t **connp)
{
- struct sockaddr_in dstaddr;
- kra_connreq_t connreq;
+ kra_connreq_t connreq;
kra_conn_t *conn;
kra_device_t *dev;
struct socket *sock;
- RAP_RETURN rrc;
- int rc;
- int idx;
-
- idx = peer->rap_nid & 0x7fffffff;
+ int rc;
+ unsigned int idx;
+
+ /* spread connections over all devices using both peer NIDs to ensure
+ * all nids use all devices */
+ idx = peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid;
dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
- rc = kranal_alloc_conn(&conn, dev);
+ rc = kranal_create_conn(&conn, dev);
if (rc != 0)
return rc;
- kranal_pack_connreq(&connreq, conn);
-
+ kranal_pack_connreq(&connreq, conn, peer->rap_nid);
+
rc = ranal_connect_sock(peer, &sock);
if (rc != 0)
goto failed_0;
rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
if (rc != 0) {
- CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
+ CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
HIPQUAD(peer->rap_ip), peer->rap_port, rc);
goto failed_1;
}
rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout);
if (rc != 0) {
- CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
+ CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
HIPQUAD(peer->rap_ip), peer->rap_port, rc);
goto failed_1;
}
sock_release(sock);
rc = -EPROTO;
- if (connreq.racr_nid != peer->rap_nid) {
- CERROR("Unexpected nid from %u.%u.%u.%u/%d: "
+ if (connreq.racr_srcnid != peer->rap_nid) {
+ CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
"received "LPX64" expected "LPX64"\n",
- HIPQUAD(peer->rap_ip), peer->rap_port,
- connreq.racr_nid, peer->rap_nid);
+ HIPQUAD(peer->rap_ip), peer->rap_port,
+ connreq.racr_srcnid, peer->rap_nid);
goto failed_0;
}
if (connreq.racr_devid != dev->rad_id) {
CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
"received %d expected %d\n",
- HIPQUAD(peer->rap_ip), peer->rap_port,
+ HIPQUAD(peer->rap_ip), peer->rap_port,
connreq.racr_devid, dev->rad_id);
goto failed_0;
}
- conn->rac_peer_incarnation = connreq.racr_incarnation;
- conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
- kranal_update_reaper_timeout(conn->rac_keepalive);
-
- rc = -ENETDOWN;
- rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
- if (rrc != RAP_SUCCESS) {
- CERROR("Can't set riparams for "LPX64": %d\n",
- peer->rap_nid, rrc);
+ rc = kranal_set_conn_params(conn, &connreq,
+ peer->rap_ip, peer->rap_port);
+ if (rc != 0)
goto failed_0;
- }
*connp = conn;
- return 0;
+ *dst_nidp = connreq.racr_dstnid;
+ return 0;
failed_1:
sock_release(sock);
{
kra_peer_t *peer2;
kra_tx_t *tx;
- ptl_nid_t peer_nid;
- unsigned long flags;
- unsigned long timeout;
- kra_conn_t *conn;
- int rc;
+ ptl_nid_t peer_nid;
+ ptl_nid_t dst_nid;
+ unsigned long flags;
+ kra_conn_t *conn;
+ int rc;
int nstale;
+ int new_peer = 0;
- if (sock != NULL) {
- /* passive: listener accepted sock */
- LASSERT (peer == NULL);
+ if (sock == NULL) {
+ /* active: connd wants to connect to 'peer' */
+ LASSERT (peer != NULL);
+ LASSERT (peer->rap_connecting);
- rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
+ rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
if (rc != 0)
return rc;
- /* assume this is a new peer */
- peer = kranal_create_peer(peer_nid);
- if (peer == NULL) {
- CERROR("Can't allocate peer for "LPX64"\n", peer_nid);
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+ if (!kranal_peer_active(peer)) {
+ /* raced with peer getting unlinked */
+ write_unlock_irqrestore(&kranal_data.kra_global_lock,
+ flags);
kranal_conn_decref(conn);
- return -ENOMEM;
- }
-
- write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
- peer2 = kranal_find_peer_locked(peer_nid);
- if (peer2 == NULL) {
- /* peer table takes my initial ref on peer */
- list_add_tail(&peer->rap_list,
- kranal_nid2peerlist(peer_nid));
- } else {
- /* peer_nid already in the peer table */
- kranal_peer_decref(peer);
- peer = peer2;
- }
- /* NB I may now have a non-persistent peer in the peer
- * table with no connections: I can't drop the global lock
- * until I've given it a connection or removed it, and when
- * I do 'peer' can disappear under me. */
+ return -ESTALE;
+ }
+
+ peer_nid = peer->rap_nid;
} else {
- /* active: connd wants to connect to peer */
- LASSERT (peer != NULL);
- LASSERT (peer->rap_connecting);
-
- rc = kranal_active_conn_handshake(peer, &conn);
+ /* passive: listener accepted 'sock' */
+ LASSERT (peer == NULL);
+
+ rc = kranal_passive_conn_handshake(sock, &peer_nid,
+ &dst_nid, &conn);
if (rc != 0)
return rc;
- write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
- if (!kranal_peer_active(peer)) {
- /* raced with peer getting unlinked */
- write_unlock_irqrestore(&kranal_data.kra_global_lock,
- flags);
+ /* assume this is a new peer */
+ peer = kranal_create_peer(peer_nid);
+ if (peer == NULL) {
+ CERROR("Can't allocate peer for "LPX64"\n", peer_nid);
kranal_conn_decref(conn);
- return ESTALE;
- }
- }
-
- LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */
- peer_nid = peer->rap_nid;
-
- /* Refuse to duplicate an existing connection (both sides might try
- * to connect at once). NB we return success! We _do_ have a
- * connection (so we don't need to remove the peer from the peer
- * table) and we _don't_ have any blocked txs to complete */
- if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) {
+ return -ENOMEM;
+ }
+
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+ peer2 = kranal_find_peer_locked(peer_nid);
+ if (peer2 == NULL) {
+ new_peer = 1;
+ } else {
+ /* peer_nid already in the peer table */
+ kranal_peer_decref(peer);
+ peer = peer2;
+ }
+ }
+
+ LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
+
+ /* Refuse connection if peer thinks we are a different NID. We check
+ * this while holding the global lock, to synch with connection
+ * destruction on NID change. */
+ if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) {
+ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+ CERROR("Stale/bad connection with "LPX64
+ ": dst_nid "LPX64", expected "LPX64"\n",
+ peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid);
+ rc = -ESTALE;
+ goto failed;
+ }
+
+ /* Refuse to duplicate an existing connection (both sides might try to
+ * connect at once). NB we return success! We _are_ connected so we
+ * _don't_ have any blocked txs to complete with failure. */
+ rc = kranal_conn_isdup_locked(peer, conn);
+ if (rc != 0) {
LASSERT (!list_empty(&peer->rap_conns));
LASSERT (list_empty(&peer->rap_tx_queue));
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
- CWARN("Not creating duplicate connection to "LPX64"\n",
- peer_nid);
- kranal_conn_decref(conn);
- return 0;
- }
+ CWARN("Not creating duplicate connection to "LPX64": %d\n",
+ peer_nid, rc);
+ rc = 0;
+ goto failed;
+ }
- kranal_peer_addref(peer); /* +1 ref for conn */
- conn->rac_peer = peer;
- list_add_tail(&conn->rac_list, &peer->rap_conns);
+ if (new_peer) {
+ /* peer table takes my ref on the new peer */
+ list_add_tail(&peer->rap_list,
+ kranal_nid2peerlist(peer_nid));
+ }
+
+ /* initialise timestamps before reaper looks at them */
+ conn->rac_last_tx = conn->rac_last_rx = jiffies;
+
+ kranal_peer_addref(peer); /* +1 ref for conn */
+ conn->rac_peer = peer;
+ list_add_tail(&conn->rac_list, &peer->rap_conns);
kranal_conn_addref(conn); /* +1 ref for conn table */
list_add_tail(&conn->rac_hashlist,
/* Schedule all packets blocking for a connection */
while (!list_empty(&peer->rap_tx_queue)) {
- tx = list_entry(&peer->rap_tx_queue.next,
+ tx = list_entry(peer->rap_tx_queue.next,
kra_tx_t, tx_list);
list_del(&tx->tx_list);
kranal_post_fma(conn, tx);
}
- nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation);
+ nstale = kranal_close_stale_conns_locked(peer, conn);
- write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
/* CAVEAT EMPTOR: passive peer can disappear NOW */
if (nstale != 0)
CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid);
+ CWARN("New connection to "LPX64" on devid[%d] = %d\n",
+ peer_nid, conn->rac_device->rad_idx, conn->rac_device->rad_id);
+
/* Ensure conn gets checked. Transmits may have been queued and an
* FMA event may have happened before it got in the cq hash table */
kranal_schedule_conn(conn);
- return 0;
+ return 0;
+
+ failed:
+ if (new_peer)
+ kranal_peer_decref(peer);
+ kranal_conn_decref(conn);
+ return rc;
}
void
LASSERT (peer->rap_connecting);
+ CDEBUG(D_NET, "About to handshake "LPX64"\n", peer->rap_nid);
+
rc = kranal_conn_handshake(NULL, peer);
+ CDEBUG(D_NET, "Done handshake "LPX64":%d \n", peer->rap_nid, rc);
+
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
LASSERT (peer->rap_connecting);
/* reset reconnection timeouts */
peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
- peer->rap_reconnect_time = CURRENT_TIME;
+ peer->rap_reconnect_time = CURRENT_SECONDS;
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
return;
}
LASSERT (peer->rap_reconnect_interval != 0);
- peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval;
+ peer->rap_reconnect_time = CURRENT_SECONDS + peer->rap_reconnect_interval;
peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL,
1 * peer->rap_reconnect_interval);
} while (!list_empty(&zombies));
}
+void
+kranal_free_acceptsock (kra_acceptsock_t *ras)
+{
+ sock_release(ras->ras_sock);
+ PORTAL_FREE(ras, sizeof(*ras));
+}
+
int
-kranal_listener(void *arg)
+kranal_listener (void *arg)
{
- struct sockaddr_in addr;
- wait_queue_t wait;
- struct socket *sock;
- struct socket *newsock;
- int port;
- kra_connreq_t *connreqs;
- char name[16];
+ struct sockaddr_in addr;
+ wait_queue_t wait;
+ struct socket *sock;
+ kra_acceptsock_t *ras;
+ int port;
+ char name[16];
int rc;
+ unsigned long flags;
- /* Parent thread holds kra_nid_mutex, and is, or is about to
- * block on kra_listener_signal */
-
- port = kranal_tunables.kra_port;
- snprintf(name, sizeof(name), "kranal_lstn%03d", port);
- kportal_daemonize(name);
- kportal_blockallsigs();
+ /* Parent thread holds kra_nid_mutex, and is, or is about to
+ * block on kra_listener_signal */
- init_waitqueue_entry(&wait, current);
+ port = kranal_tunables.kra_port;
+ snprintf(name, sizeof(name), "kranal_lstn%03d", port);
+ kportal_daemonize(name);
+ kportal_blockallsigs();
- rc = -ENOMEM;
- PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs));
- if (connreqs == NULL)
- goto out_0;
+ init_waitqueue_entry(&wait, current);
- rc = kranal_create_sock(&sock);
- if (rc != 0)
- goto out_1;
+ rc = kranal_create_sock(&sock);
+ if (rc != 0)
+ goto out_0;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
- rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
- if (rc != 0) {
- CERROR("Can't bind to port %d\n", port);
- goto out_2;
- }
+ rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
+ if (rc != 0) {
+ CERROR("Can't bind to port %d\n", port);
+ goto out_1;
+ }
- rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
- if (rc != 0) {
- CERROR("Can't set listen backlog %d: %d\n",
+ rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
+ if (rc != 0) {
+ CERROR("Can't set listen backlog %d: %d\n",
kranal_tunables.kra_backlog, rc);
- goto out_2;
- }
+ goto out_1;
+ }
- LASSERT (kranal_data.kra_listener_sock == NULL);
- kranal_data.kra_listener_sock = sock;
+ LASSERT (kranal_data.kra_listener_sock == NULL);
+ kranal_data.kra_listener_sock = sock;
- /* unblock waiting parent */
- LASSERT (kranal_data.kra_listener_shutdown == 0);
- up(&kranal_data.kra_listener_signal);
+ /* unblock waiting parent */
+ LASSERT (kranal_data.kra_listener_shutdown == 0);
+ up(&kranal_data.kra_listener_signal);
- /* Wake me any time something happens on my socket */
- add_wait_queue(sock->sk->sk_sleep, &wait);
+ /* Wake me any time something happens on my socket */
+ add_wait_queue(sock->sk->sk_sleep, &wait);
+ ras = NULL;
- while (kranal_data.kra_listener_shutdown == 0) {
+ while (kranal_data.kra_listener_shutdown == 0) {
- newsock = sock_alloc();
- if (newsock == NULL) {
- CERROR("Can't allocate new socket for accept\n");
- kranal_pause(HZ);
- continue;
- }
+ if (ras == NULL) {
+ PORTAL_ALLOC(ras, sizeof(*ras));
+ if (ras == NULL) {
+ CERROR("Out of Memory: pausing...\n");
+ kranal_pause(HZ);
+ continue;
+ }
+ ras->ras_sock = NULL;
+ }
+
+ if (ras->ras_sock == NULL) {
+ ras->ras_sock = sock_alloc();
+ if (ras->ras_sock == NULL) {
+ CERROR("Can't allocate socket: pausing...\n");
+ kranal_pause(HZ);
+ continue;
+ }
+ /* XXX this should add a ref to sock->ops->owner, if
+ * TCP could be a module */
+ ras->ras_sock->type = sock->type;
+ ras->ras_sock->ops = sock->ops;
+ }
+
+ set_current_state(TASK_INTERRUPTIBLE);
- set_current_state(TASK_INTERRUPTIBLE);
+ rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK);
- rc = sock->ops->accept(sock, newsock, O_NONBLOCK);
+ /* Sleep for socket activity? */
+ if (rc == -EAGAIN &&
+ kranal_data.kra_listener_shutdown == 0)
+ schedule();
- if (rc == -EAGAIN &&
- kranal_data.kra_listener_shutdown == 0)
- schedule();
+ set_current_state(TASK_RUNNING);
- set_current_state(TASK_RUNNING);
+ if (rc == 0) {
+ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
- if (rc != 0) {
- sock_release(newsock);
- if (rc != -EAGAIN) {
- CERROR("Accept failed: %d\n", rc);
- kranal_pause(HZ);
- }
- continue;
- }
+ list_add_tail(&ras->ras_list,
+ &kranal_data.kra_connd_acceptq);
- kranal_conn_handshake(newsock, NULL);
- sock_release(newsock);
- }
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+ wake_up(&kranal_data.kra_connd_waitq);
- rc = 0;
- remove_wait_queue(sock->sk->sk_sleep, &wait);
- out_2:
- sock_release(sock);
- kranal_data.kra_listener_sock = NULL;
+ ras = NULL;
+ continue;
+ }
+
+ if (rc != -EAGAIN) {
+ CERROR("Accept failed: %d, pausing...\n", rc);
+ kranal_pause(HZ);
+ }
+ }
+
+ if (ras != NULL) {
+ if (ras->ras_sock != NULL)
+ sock_release(ras->ras_sock);
+ PORTAL_FREE(ras, sizeof(*ras));
+ }
+
+ rc = 0;
+ remove_wait_queue(sock->sk->sk_sleep, &wait);
out_1:
- PORTAL_FREE(connreqs, 2 * sizeof(*connreqs));
+ sock_release(sock);
+ kranal_data.kra_listener_sock = NULL;
out_0:
- /* set completion status and unblock thread waiting for me
- * (parent on startup failure, executioner on normal shutdown) */
- kranal_data.kra_listener_shutdown = rc;
- up(&kranal_data.kra_listener_signal);
+ /* set completion status and unblock thread waiting for me
+ * (parent on startup failure, executioner on normal shutdown) */
+ kranal_data.kra_listener_shutdown = rc;
+ up(&kranal_data.kra_listener_signal);
- return 0;
+ return 0;
}
int
kranal_start_listener (void)
{
- long pid;
- int rc;
+ long pid;
+ int rc;
- CDEBUG(D_WARNING, "Starting listener\n");
+ CDEBUG(D_NET, "Starting listener\n");
- /* Called holding kra_nid_mutex: listener stopped */
- LASSERT (kranal_data.kra_listener_sock == NULL);
+ /* Called holding kra_nid_mutex: listener stopped */
+ LASSERT (kranal_data.kra_listener_sock == NULL);
- kranal_data.kra_listener_shutdown == 0;
- pid = kernel_thread(kranal_listener, NULL, 0);
- if (pid < 0) {
- CERROR("Can't spawn listener: %ld\n", pid);
- return (int)pid;
- }
+ kranal_data.kra_listener_shutdown = 0;
+ pid = kernel_thread(kranal_listener, NULL, 0);
+ if (pid < 0) {
+ CERROR("Can't spawn listener: %ld\n", pid);
+ return (int)pid;
+ }
- /* Block until listener has started up. */
- down(&kranal_data.kra_listener_signal);
+ /* Block until listener has started up. */
+ down(&kranal_data.kra_listener_signal);
- rc = kranal_data.kra_listener_shutdown;
- LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
+ rc = kranal_data.kra_listener_shutdown;
+ LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
- CDEBUG(D_WARNING, "Listener %ld started OK\n", pid);
- return rc;
+ CDEBUG(D_NET, "Listener %ld started OK\n", pid);
+ return rc;
}
void
-kranal_stop_listener(void)
+kranal_stop_listener(int clear_acceptq)
{
- CDEBUG(D_WARNING, "Stopping listener\n");
+ struct list_head zombie_accepts;
+ unsigned long flags;
+ kra_acceptsock_t *ras;
+
+ CDEBUG(D_NET, "Stopping listener\n");
+
+ /* Called holding kra_nid_mutex: listener running */
+ LASSERT (kranal_data.kra_listener_sock != NULL);
+
+ kranal_data.kra_listener_shutdown = 1;
+ wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep);
- /* Called holding kra_nid_mutex: listener running */
- LASSERT (kranal_data.kra_listener_sock != NULL);
+ /* Block until listener has torn down. */
+ down(&kranal_data.kra_listener_signal);
- kranal_data.kra_listener_shutdown = 1;
- wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep);
+ LASSERT (kranal_data.kra_listener_sock == NULL);
+ CDEBUG(D_NET, "Listener stopped\n");
- /* Block until listener has torn down. */
- down(&kranal_data.kra_listener_signal);
+ if (!clear_acceptq)
+ return;
+
+ /* Close any unhandled accepts */
+ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+
+ list_add(&zombie_accepts, &kranal_data.kra_connd_acceptq);
+ list_del_init(&kranal_data.kra_connd_acceptq);
- LASSERT (kranal_data.kra_listener_sock == NULL);
- CDEBUG(D_WARNING, "Listener stopped\n");
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+ while (!list_empty(&zombie_accepts)) {
+ ras = list_entry(zombie_accepts.next,
+ kra_acceptsock_t, ras_list);
+ list_del(&ras->ras_list);
+ kranal_free_acceptsock(ras);
+ }
}
-int
+int
kranal_listener_procint(ctl_table *table, int write, struct file *filp,
- void *buffer, size_t *lenp)
+ void *buffer, size_t *lenp)
{
- int *tunable = (int *)table->data;
- int old_val;
- int rc;
+ int *tunable = (int *)table->data;
+ int old_val;
+ int rc;
+
+ /* No race with nal initialisation since the nal is setup all the time
+ * it's loaded. When that changes, change this! */
+ LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
+
+ down(&kranal_data.kra_nid_mutex);
- down(&kranal_data.kra_nid_mutex);
+ LASSERT (tunable == &kranal_tunables.kra_port ||
+ tunable == &kranal_tunables.kra_backlog);
+ old_val = *tunable;
- LASSERT (tunable == &kranal_tunables.kra_port ||
- tunable == &kranal_tunables.kra_backlog);
- old_val = *tunable;
+ rc = proc_dointvec(table, write, filp, buffer, lenp);
- rc = proc_dointvec(table, write, filp, buffer, lenp);
+ if (write &&
+ (*tunable != old_val ||
+ kranal_data.kra_listener_sock == NULL)) {
- if (write &&
- (*tunable != old_val ||
- kranal_data.kra_listener_sock == NULL)) {
+ if (kranal_data.kra_listener_sock != NULL)
+ kranal_stop_listener(0);
- if (kranal_data.kra_listener_sock != NULL)
- kranal_stop_listener();
+ rc = kranal_start_listener();
- rc = kranal_start_listener();
+ if (rc != 0) {
+ CWARN("Unable to start listener with new tunable:"
+ " reverting to old value\n");
+ *tunable = old_val;
+ kranal_start_listener();
+ }
+ }
- if (rc != 0) {
- *tunable = old_val;
- kranal_start_listener();
- }
- }
+ up(&kranal_data.kra_nid_mutex);
- up(&kranal_data.kra_nid_mutex);
- return rc;
+ LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
+ return rc;
}
int
kranal_set_mynid(ptl_nid_t nid)
{
- lib_ni_t *ni = &kranal_lib.libnal_ni;
- int rc = 0;
+ unsigned long flags;
+ lib_ni_t *ni = &kranal_lib.libnal_ni;
+ int rc = 0;
CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
nid, ni->ni_pid.nid);
return 0;
}
- if (kranal_data.kra_listener_sock != NULL)
- kranal_stop_listener();
+ if (kranal_data.kra_listener_sock != NULL)
+ kranal_stop_listener(1);
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+ kranal_data.kra_peerstamp++;
ni->ni_pid.nid = nid;
+ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
/* Delete all existing peers and their connections after new
- * NID/incarnation set to ensure no old connections in our brave
+ * NID/connstamp set to ensure no old connections in our brave
* new world. */
kranal_del_peer(PTL_NID_ANY, 0);
peer->rap_nid = nid;
atomic_set(&peer->rap_refcount, 1); /* 1 ref for caller */
- INIT_LIST_HEAD(&peer->rap_list); /* not in the peer table yet */
+ INIT_LIST_HEAD(&peer->rap_list);
+ INIT_LIST_HEAD(&peer->rap_connd_list);
INIT_LIST_HEAD(&peer->rap_conns);
INIT_LIST_HEAD(&peer->rap_tx_queue);
- peer->rap_reconnect_time = CURRENT_TIME;
+ peer->rap_reconnect_time = CURRENT_SECONDS;
peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
atomic_inc(&kranal_data.kra_npeers);
}
void
-__kranal_peer_decref (kra_peer_t *peer)
+kranal_destroy_peer (kra_peer_t *peer)
{
CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
LASSERT (atomic_read(&peer->rap_refcount) == 0);
LASSERT (peer->rap_persistence == 0);
LASSERT (!kranal_peer_active(peer));
- LASSERT (peer->rap_connecting == 0);
+ LASSERT (!peer->rap_connecting);
LASSERT (list_empty(&peer->rap_conns));
LASSERT (list_empty(&peer->rap_tx_queue));
+ LASSERT (list_empty(&peer->rap_connd_list));
PORTAL_FREE(peer, sizeof(*peer));
}
int
-kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp,
+kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp,
int *persistencep)
{
kra_peer_t *peer;
}
int
-kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
-{
- kra_conn_t *conn;
- struct list_head *ctmp;
- struct list_head *cnxt;
- int count = 0;
-
- list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
- conn = list_entry(ctmp, kra_conn_t, rac_list);
-
- if (conn->rac_peer_incarnation == incarnation)
- continue;
-
- CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
- peer->rap_nid, conn->rac_peer_incarnation, incarnation);
- LASSERT (conn->rac_peer_incarnation < incarnation);
-
- count++;
- kranal_close_conn_locked(conn, -ESTALE);
- }
-
- return count;
-}
-
-int
kranal_close_matching_conns (ptl_nid_t nid)
{
unsigned long flags;
break;
}
case NAL_CMD_DEL_PEER: {
- rc = kranal_del_peer(pcfg->pcfg_nid,
+ rc = kranal_del_peer(pcfg->pcfg_nid,
/* flags == single_share */
pcfg->pcfg_flags != 0);
break;
else {
rc = 0;
pcfg->pcfg_nid = conn->rac_peer->rap_nid;
- pcfg->pcfg_id = 0;
+ pcfg->pcfg_id = conn->rac_device->rad_id;
pcfg->pcfg_misc = 0;
pcfg->pcfg_flags = 0;
kranal_conn_decref(conn);
PORTAL_ALLOC(tx->tx_phys,
PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
if (tx->tx_phys == NULL) {
- CERROR("Can't allocate %stx[%d]->tx_phys\n",
+ CERROR("Can't allocate %stx[%d]->tx_phys\n",
isnblk ? "nblk " : "", i);
PORTAL_FREE(tx, sizeof(*tx));
tx->tx_isnblk = isnblk;
tx->tx_buftype = RANAL_BUF_NONE;
+ tx->tx_msg.ram_type = RANAL_MSG_NONE;
list_add(&tx->tx_list, freelist);
}
goto failed_1;
}
- rrc = RapkCreatePtag(dev->rad_handle,
- &dev->rad_ptag);
- if (rrc != RAP_SUCCESS) {
- CERROR("Can't create ptag"
- " for device %d: %d\n", id, rrc);
- goto failed_1;
- }
-
- rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag,
- &dev->rad_rdma_cq);
+ rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
+ &dev->rad_rdma_cqh);
if (rrc != RAP_SUCCESS) {
CERROR("Can't create rdma cq size %d"
" for device %d: %d\n", total_ntx, id, rrc);
- goto failed_2;
+ goto failed_1;
}
- rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE,
- dev->rad_ptag, &dev->rad_fma_cq);
+ rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV,
+ &dev->rad_fma_cqh);
if (rrc != RAP_SUCCESS) {
CERROR("Can't create fma cq size %d"
" for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
- goto failed_3;
+ goto failed_2;
}
return 0;
- failed_3:
- RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
failed_2:
- RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+ RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
failed_1:
RapkReleaseDevice(dev->rad_handle);
failed_0:
void
kranal_device_fini(kra_device_t *dev)
{
- RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
- RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
- RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+ LASSERT(dev->rad_scheduler == NULL);
+ RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
+ RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
RapkReleaseDevice(dev->rad_handle);
}
kranal_api_shutdown (nal_t *nal)
{
int i;
- int rc;
unsigned long flags;
-
+
if (nal->nal_refct != 0) {
/* This module got the first ref */
PORTAL_MODULE_UNUSE;
"waiting for %d peers and %d conns to close down\n",
atomic_read(&kranal_data.kra_npeers),
atomic_read(&kranal_data.kra_nconns));
- kranal_pause(HZ);
+ kranal_pause(HZ);
}
/* fall through */
break;
}
+ /* Conn/Peer state all cleaned up BEFORE setting shutdown, so threads
+ * don't have to worry about shutdown races */
+ LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
+ LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
+
/* flag threads to terminate; wake and wait for them to die */
kranal_data.kra_shutdown = 1;
for (i = 0; i < kranal_data.kra_ndevs; i++) {
kra_device_t *dev = &kranal_data.kra_devices[i];
- LASSERT (list_empty(&dev->rad_connq));
-
+ LASSERT (list_empty(&dev->rad_ready_conns));
+ LASSERT (list_empty(&dev->rad_new_conns));
+ LASSERT (dev->rad_nphysmap == 0);
+ LASSERT (dev->rad_nppphysmap == 0);
+ LASSERT (dev->rad_nvirtmap == 0);
+ LASSERT (dev->rad_nobvirtmap == 0);
+
spin_lock_irqsave(&dev->rad_lock, flags);
wake_up(&dev->rad_waitq);
spin_unlock_irqrestore(&dev->rad_lock, flags);
spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
LASSERT (list_empty(&kranal_data.kra_connd_peers));
- spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
wake_up_all(&kranal_data.kra_connd_waitq);
- spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
i = 2;
while (atomic_read(&kranal_data.kra_nthreads) != 0) {
LASSERT (list_empty(&kranal_data.kra_peers[i]));
PORTAL_FREE(kranal_data.kra_peers,
- sizeof (struct list_head) *
+ sizeof (struct list_head) *
kranal_data.kra_peer_hash_size);
}
LASSERT (list_empty(&kranal_data.kra_conns[i]));
PORTAL_FREE(kranal_data.kra_conns,
- sizeof (struct list_head) *
+ sizeof (struct list_head) *
kranal_data.kra_conn_hash_size);
}
ptl_ni_limits_t *requested_limits,
ptl_ni_limits_t *actual_limits)
{
- static int device_ids[] = {RAPK_MAIN_DEVICE_ID,
- RAPK_EXPANSION_DEVICE_ID};
struct timeval tv;
ptl_process_id_t process_id;
int pkmem = atomic_read(&portal_kmemory);
memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
/* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
- * a unique (for all time) incarnation so we can uniquely identify
- * the sender. The incarnation is an incrementing counter
+ * a unique (for all time) connstamp so we can uniquely identify
+ * the sender. The connstamp is an incrementing counter
* initialised with seconds + microseconds at startup time. So we
* rely on NOT creating connections more frequently on average than
- * 1MHz to ensure we don't use old incarnations when we reboot. */
+ * 1MHz to ensure we don't use old connstamps when we reboot. */
do_gettimeofday(&tv);
- kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+ kranal_data.kra_connstamp =
+ kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
init_MUTEX(&kranal_data.kra_nid_mutex);
init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
kra_device_t *dev = &kranal_data.kra_devices[i];
dev->rad_idx = i;
- INIT_LIST_HEAD(&dev->rad_connq);
+ INIT_LIST_HEAD(&dev->rad_ready_conns);
+ INIT_LIST_HEAD(&dev->rad_new_conns);
init_waitqueue_head(&dev->rad_waitq);
spin_lock_init(&dev->rad_lock);
}
+ kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
init_waitqueue_head(&kranal_data.kra_reaper_waitq);
spin_lock_init(&kranal_data.kra_reaper_lock);
+ INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq);
INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
init_waitqueue_head(&kranal_data.kra_connd_waitq);
spin_lock_init(&kranal_data.kra_connd_lock);
/* OK to call kranal_api_shutdown() to cleanup now */
kranal_data.kra_init = RANAL_INIT_DATA;
-
+
kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
PORTAL_ALLOC(kranal_data.kra_peers,
sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
}
for (i = 0; i < RANAL_N_CONND; i++) {
- rc = kranal_thread_start(kranal_connd, (void *)i);
+ rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
if (rc != 0) {
CERROR("Can't spawn ranal connd[%d]: %d\n",
i, rc);
}
}
- LASSERT(kranal_data.kra_ndevs == 0);
- for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) {
+ LASSERT (kranal_data.kra_ndevs == 0);
+
+ for (i = 0; i < sizeof(kranal_devids)/sizeof(kranal_devids[0]); i++) {
+ LASSERT (i < RANAL_MAXDEVS);
+
dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
- rc = kranal_device_init(device_ids[i], dev);
+ rc = kranal_device_init(kranal_devids[i], dev);
if (rc == 0)
kranal_data.kra_ndevs++;
-
+ }
+
+ if (kranal_data.kra_ndevs == 0) {
+ CERROR("Can't initialise any RapidArray devices\n");
+ goto failed;
+ }
+
+ for (i = 0; i < kranal_data.kra_ndevs; i++) {
+ dev = &kranal_data.kra_devices[i];
rc = kranal_thread_start(kranal_scheduler, dev);
if (rc != 0) {
CERROR("Can't spawn ranal scheduler[%d]: %d\n",
}
}
- if (kranal_data.kra_ndevs == 0)
- goto failed;
-
rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL);
if (rc != 0) {
CERROR("Can't initialise command interface (rc = %d)\n", rc);
return PTL_OK;
failed:
- kranal_api_shutdown(&kranal_api);
+ kranal_api_shutdown(&kranal_api);
return PTL_FAIL;
}
void __exit
kranal_module_fini (void)
{
-#ifdef CONFIG_SYSCTL
if (kranal_tunables.kra_sysctl != NULL)
unregister_sysctl_table(kranal_tunables.kra_sysctl);
-#endif
+
PtlNIFini(kranal_ni);
ptl_unregister_nal(RANAL);
/* Initialise dynamic tunables to defaults once only */
kranal_tunables.kra_timeout = RANAL_TIMEOUT;
+ kranal_tunables.kra_listener_timeout = RANAL_LISTENER_TIMEOUT;
+ kranal_tunables.kra_backlog = RANAL_BACKLOG;
+ kranal_tunables.kra_port = RANAL_PORT;
+ kranal_tunables.kra_max_immediate = RANAL_MAX_IMMEDIATE;
rc = ptl_register_nal(RANAL, &kranal_api);
if (rc != PTL_OK) {
return -ENODEV;
}
-#ifdef CONFIG_SYSCTL
- /* Press on regardless even if registering sysctl doesn't work */
- kranal_tunables.kra_sysctl =
+ kranal_tunables.kra_sysctl =
register_sysctl_table(kranal_top_ctl_table, 0);
-#endif
+ if (kranal_tunables.kra_sysctl == NULL) {
+ CERROR("Can't register sysctl table\n");
+ PtlNIFini(kranal_ni);
+ ptl_unregister_nal(RANAL);
+ return -ENOMEM;
+ }
+
return 0;
}