{
struct socket *sock;
int rc;
- struct timeval tv;
int option;
mm_segment_t oldmm = get_fs();
memset(connreq, 0, sizeof(*connreq));
- connreq->racr_magic = RANAL_MSG_MAGIC;
- connreq->racr_version = RANAL_MSG_VERSION;
- connreq->racr_devid = conn->rac_device->rad_id;
- connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid;
- connreq->racr_timeout = conn->rac_timeout;
- connreq->racr_incarnation = conn->rac_my_incarnation;
+ connreq->racr_magic = RANAL_MSG_MAGIC;
+ connreq->racr_version = RANAL_MSG_VERSION;
+ connreq->racr_devid = conn->rac_device->rad_id;
+ connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid;
+ connreq->racr_peerstamp = kranal_data.kra_peerstamp;
+ connreq->racr_connstamp = conn->rac_my_connstamp;
+ connreq->racr_timeout = conn->rac_timeout;
rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
LASSERT(rrc == RAP_SUCCESS);
int
kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
{
- int i;
int rc;
rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
__swab16s(&connreq->racr_version);
__swab16s(&connreq->racr_devid);
__swab64s(&connreq->racr_nid);
- __swab64s(&connreq->racr_incarnation);
+ __swab64s(&connreq->racr_peerstamp);
+ __swab64s(&connreq->racr_connstamp);
__swab32s(&connreq->racr_timeout);
__swab32s(&connreq->racr_riparams.FmaDomainHndl);
return -EPROTO;
}
- for (i = 0; i < kranal_data.kra_ndevs; i++)
- if (connreq->racr_devid ==
- kranal_data.kra_devices[i].rad_id)
- break;
+ return 0;
+}
- if (i == kranal_data.kra_ndevs) {
- CERROR("Can't match device %d\n", connreq->racr_devid);
- return -ENODEV;
+int
+kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
+{
+ kra_conn_t *conn;
+ struct list_head *ctmp;
+ struct list_head *cnxt;
+ int loopback;
+ int count = 0;
+
+ loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+
+ list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+ conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+ if (conn == newconn)
+ continue;
+
+ if (conn->rac_peerstamp != newconn->rac_peerstamp) {
+ CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+ " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid,
+ conn->rac_peerstamp, newconn->rac_peerstamp);
+ LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
+ count++;
+ kranal_close_conn_locked(conn, -ESTALE);
+ continue;
+ }
+
+ if (conn->rac_device != newconn->rac_device)
+ continue;
+
+ if (loopback &&
+ newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+ newconn->rac_peer_connstamp == conn->rac_my_connstamp)
+ continue;
+
+ LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
+
+ CDEBUG(D_NET, "Closing stale conn nid:"LPX64
+ " connstamp:"LPX64"("LPX64")\n", peer->rap_nid,
+ conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
+
+ count++;
+ kranal_close_conn_locked(conn, -ESTALE);
}
- return 0;
+ return count;
}
int
-kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation)
+kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
{
kra_conn_t *conn;
struct list_head *tmp;
- int loopback = 0;
+ int loopback;
+ loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
+
list_for_each(tmp, &peer->rap_conns) {
conn = list_entry(tmp, kra_conn_t, rac_list);
- if (conn->rac_peer_incarnation < incarnation) {
- /* Conns with an older incarnation get culled later */
+ /* 'newconn' is from an earlier version of 'peer'!!! */
+ if (newconn->rac_peerstamp < conn->rac_peerstamp)
+ return 1;
+
+ /* 'conn' is from an earlier version of 'peer': it will be
+ * removed when we cull stale conns later on... */
+ if (newconn->rac_peerstamp > conn->rac_peerstamp)
continue;
- }
- if (!loopback &&
- conn->rac_peer_incarnation == incarnation &&
- peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
- /* loopback creates 2 conns */
- loopback = 1;
+ /* Different devices are OK */
+ if (conn->rac_device != newconn->rac_device)
+ continue;
+
+ /* It's me connecting to myself */
+ if (loopback &&
+ newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
+ newconn->rac_peer_connstamp == conn->rac_my_connstamp)
continue;
- }
- return 1;
+ /* 'newconn' is an earlier connection from 'peer'!!! */
+ if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
+ return 2;
+
+ /* 'conn' is an earlier connection from 'peer': it will be
+ * removed when we cull stale conns later on... */
+ if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
+ continue;
+
+ /* 'newconn' has the SAME connection stamp; 'peer' isn't
+ * playing the game... */
+ return 3;
}
return 0;
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
- conn->rac_my_incarnation = kranal_data.kra_next_incarnation++;
+ conn->rac_my_connstamp = kranal_data.kra_connstamp++;
do { /* allocate a unique cqid */
conn->rac_cqid = kranal_data.kra_next_cqid++;
}
int
-kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
+kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
{
kra_conn_t *conn;
RAP_RETURN rrc;
atomic_set(&conn->rac_refcount, 1);
INIT_LIST_HEAD(&conn->rac_list);
INIT_LIST_HEAD(&conn->rac_hashlist);
+ INIT_LIST_HEAD(&conn->rac_schedlist);
INIT_LIST_HEAD(&conn->rac_fmaq);
INIT_LIST_HEAD(&conn->rac_rdmaq);
INIT_LIST_HEAD(&conn->rac_replyq);
}
void
-__kranal_conn_decref(kra_conn_t *conn)
+kranal_destroy_conn(kra_conn_t *conn)
{
- kra_tx_t *tx;
RAP_RETURN rrc;
LASSERT (!in_interrupt());
LASSERT (!conn->rac_scheduled);
LASSERT (list_empty(&conn->rac_list));
LASSERT (list_empty(&conn->rac_hashlist));
+ LASSERT (list_empty(&conn->rac_schedlist));
LASSERT (atomic_read(&conn->rac_refcount) == 0);
-
- while (!list_empty(&conn->rac_fmaq)) {
- tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
-
- list_del(&tx->tx_list);
- kranal_tx_done(tx, -ECONNABORTED);
- }
-
- /* We may not destroy this connection while it has RDMAs outstanding */
+ LASSERT (list_empty(&conn->rac_fmaq));
LASSERT (list_empty(&conn->rac_rdmaq));
+ LASSERT (list_empty(&conn->rac_replyq));
- while (!list_empty(&conn->rac_replyq)) {
- tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
-
- list_del(&tx->tx_list);
- kranal_tx_done(tx, -ECONNABORTED);
- }
-
rrc = RapkDestroyRi(conn->rac_device->rad_handle,
conn->rac_rihandle);
LASSERT (rrc == RAP_SUCCESS);
void
kranal_terminate_conn_locked (kra_conn_t *conn)
{
- kra_peer_t *peer = conn->rac_peer;
-
LASSERT (!in_interrupt());
- LASSERT (conn->rac_closing);
+ LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
LASSERT (!list_empty(&conn->rac_hashlist));
LASSERT (list_empty(&conn->rac_list));
- /* Remove from conn hash table (no new callbacks) */
+ /* Remove from conn hash table: no new callbacks */
list_del_init(&conn->rac_hashlist);
kranal_conn_decref(conn);
- /* Conn is now just waiting for remaining refs to go */
+ conn->rac_state = RANAL_CONN_CLOSED;
+
+ /* schedule to clear out all uncompleted comms in context of dev's
+ * scheduler */
+ kranal_schedule_conn(conn);
}
void
"closing conn to "LPX64": error %d\n", peer->rap_nid, error);
LASSERT (!in_interrupt());
- LASSERT (!conn->rac_closing);
+ LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
LASSERT (!list_empty(&conn->rac_hashlist));
LASSERT (!list_empty(&conn->rac_list));
/* Non-persistent peer with no more conns... */
kranal_unlink_peer_locked(peer);
}
+
+ /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
+ * full timeout */
+ conn->rac_last_rx = jiffies;
+ mb();
- conn->rac_closing = 1;
- kranal_schedule_conn(conn);
+ conn->rac_state = RANAL_CONN_CLOSING;
+ kranal_schedule_conn(conn); /* schedule sending CLOSE */
kranal_conn_decref(conn); /* lose peer's ref */
}
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
- if (!conn->rac_closing)
+ if (conn->rac_state == RANAL_CONN_ESTABLISHED)
kranal_close_conn_locked(conn, error);
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
}
int
+kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
+ __u32 peer_ip, int peer_port)
+{
+ RAP_RETURN rrc;
+
+ rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+ if (rrc != RAP_SUCCESS) {
+ CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
+ HIPQUAD(peer_ip), peer_port, rrc);
+ return -EPROTO;
+ }
+
+ conn->rac_peerstamp = connreq->racr_peerstamp;
+ conn->rac_peer_connstamp = connreq->racr_connstamp;
+ conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
+ kranal_update_reaper_timeout(conn->rac_keepalive);
+ return 0;
+}
+
+int
kranal_passive_conn_handshake (struct socket *sock,
ptl_nid_t *peer_nidp, kra_conn_t **connp)
{
ptl_nid_t peer_nid;
kra_conn_t *conn;
kra_device_t *dev;
- RAP_RETURN rrc;
int rc;
int len;
int i;
LASSERT (peer_nid != PTL_NID_ANY);
for (i = 0;;i++) {
- LASSERT(i < kranal_data.kra_ndevs);
+ if (i == kranal_data.kra_ndevs) {
+ CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
+ connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
+ return -ENODEV;
+ }
dev = &kranal_data.kra_devices[i];
if (dev->rad_id == connreq.racr_devid)
break;
}
- rc = kranal_alloc_conn(&conn, dev);
+ rc = kranal_create_conn(&conn, dev);
if (rc != 0)
return rc;
- conn->rac_peer_incarnation = connreq.racr_incarnation;
- conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
- kranal_update_reaper_timeout(conn->rac_keepalive);
-
- rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
- if (rrc != RAP_SUCCESS) {
- CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
+ rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port);
+ if (rc != 0) {
kranal_conn_decref(conn);
- return -EPROTO;
+ return rc;
}
kranal_pack_connreq(&connreq, conn);
struct socket *sock;
unsigned int port;
int rc;
- int option;
- mm_segment_t oldmm = get_fs();
- struct timeval tv;
for (port = 1023; port >= 512; port--) {
int
kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
{
- struct sockaddr_in dstaddr;
kra_connreq_t connreq;
kra_conn_t *conn;
kra_device_t *dev;
struct socket *sock;
- RAP_RETURN rrc;
int rc;
int idx;
idx = peer->rap_nid & 0x7fffffff;
dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
- rc = kranal_alloc_conn(&conn, dev);
+ rc = kranal_create_conn(&conn, dev);
if (rc != 0)
return rc;
goto failed_0;
}
- conn->rac_peer_incarnation = connreq.racr_incarnation;
- conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
- kranal_update_reaper_timeout(conn->rac_keepalive);
-
- rc = -ENETDOWN;
- rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
- if (rrc != RAP_SUCCESS) {
- CERROR("Can't set riparams for "LPX64": %d\n",
- peer->rap_nid, rrc);
+ rc = kranal_set_conn_params(conn, &connreq,
+ peer->rap_ip, peer->rap_port);
+ if (rc != 0)
goto failed_0;
- }
*connp = conn;
return 0;
kra_tx_t *tx;
ptl_nid_t peer_nid;
unsigned long flags;
- unsigned long timeout;
kra_conn_t *conn;
int rc;
int nstale;
- if (sock != NULL) {
- /* passive: listener accepted sock */
+ if (sock == NULL) {
+ /* active: connd wants to connect to 'peer' */
+ LASSERT (peer != NULL);
+ LASSERT (peer->rap_connecting);
+
+ rc = kranal_active_conn_handshake(peer, &conn);
+ if (rc != 0)
+ return rc;
+
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+ if (!kranal_peer_active(peer)) {
+ /* raced with peer getting unlinked */
+ write_unlock_irqrestore(&kranal_data.kra_global_lock,
+ flags);
+ kranal_conn_decref(conn);
+ return ESTALE;
+ }
+
+ peer_nid = peer->rap_nid;
+
+ } else {
+ /* passive: listener accepted 'sock' */
LASSERT (peer == NULL);
rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
* table with no connections: I can't drop the global lock
* until I've given it a connection or removed it, and when
* I do 'peer' can disappear under me. */
- } else {
- /* active: connd wants to connect to peer */
- LASSERT (peer != NULL);
- LASSERT (peer->rap_connecting);
-
- rc = kranal_active_conn_handshake(peer, &conn);
- if (rc != 0)
- return rc;
-
- write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
- if (!kranal_peer_active(peer)) {
- /* raced with peer getting unlinked */
- write_unlock_irqrestore(&kranal_data.kra_global_lock,
- flags);
- kranal_conn_decref(conn);
- return ESTALE;
- }
- }
+ }
LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */
- peer_nid = peer->rap_nid;
/* Refuse to duplicate an existing connection (both sides might try
* to connect at once). NB we return success! We _do_ have a
* connection (so we don't need to remove the peer from the peer
* table) and we _don't_ have any blocked txs to complete */
- if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) {
+ rc = kranal_conn_isdup_locked(peer, conn);
+ if (rc != 0) {
LASSERT (!list_empty(&peer->rap_conns));
LASSERT (list_empty(&peer->rap_tx_queue));
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
- CWARN("Not creating duplicate connection to "LPX64"\n",
- peer_nid);
+ CWARN("Not creating duplicate connection to "LPX64": %d\n",
+ peer_nid, rc);
kranal_conn_decref(conn);
return 0;
}
kranal_post_fma(conn, tx);
}
- nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation);
+ nstale = kranal_close_stale_conns_locked(peer, conn);
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
/* Called holding kra_nid_mutex: listener stopped */
LASSERT (kranal_data.kra_listener_sock == NULL);
- kranal_data.kra_listener_shutdown == 0;
+ kranal_data.kra_listener_shutdown = 0;
pid = kernel_thread(kranal_listener, NULL, 0);
if (pid < 0) {
CERROR("Can't spawn listener: %ld\n", pid);
int old_val;
int rc;
+ /* No race with nal initialisation since the nal is setup all the time
+ * it's loaded. When that changes, change this! */
+ LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
+
down(&kranal_data.kra_nid_mutex);
LASSERT (tunable == &kranal_tunables.kra_port ||
rc = kranal_start_listener();
if (rc != 0) {
+ CWARN("Unable to start listener with new tunable:"
+ " reverting to old value\n");
*tunable = old_val;
kranal_start_listener();
}
}
up(&kranal_data.kra_nid_mutex);
+
+ LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
return rc;
}
int
kranal_set_mynid(ptl_nid_t nid)
{
+ unsigned long flags;
lib_ni_t *ni = &kranal_lib.libnal_ni;
int rc = 0;
if (kranal_data.kra_listener_sock != NULL)
kranal_stop_listener();
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+ kranal_data.kra_peerstamp++;
+ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
ni->ni_pid.nid = nid;
/* Delete all existing peers and their connections after new
- * NID/incarnation set to ensure no old connections in our brave
+ * NID/connstamp set to ensure no old connections in our brave
* new world. */
kranal_del_peer(PTL_NID_ANY, 0);
peer->rap_nid = nid;
atomic_set(&peer->rap_refcount, 1); /* 1 ref for caller */
- INIT_LIST_HEAD(&peer->rap_list); /* not in the peer table yet */
+ INIT_LIST_HEAD(&peer->rap_list);
+ INIT_LIST_HEAD(&peer->rap_connd_list);
INIT_LIST_HEAD(&peer->rap_conns);
INIT_LIST_HEAD(&peer->rap_tx_queue);
}
void
-__kranal_peer_decref (kra_peer_t *peer)
+kranal_destroy_peer (kra_peer_t *peer)
{
CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
LASSERT (atomic_read(&peer->rap_refcount) == 0);
LASSERT (peer->rap_persistence == 0);
LASSERT (!kranal_peer_active(peer));
- LASSERT (peer->rap_connecting == 0);
+ LASSERT (!peer->rap_connecting);
LASSERT (list_empty(&peer->rap_conns));
LASSERT (list_empty(&peer->rap_tx_queue));
+ LASSERT (list_empty(&peer->rap_connd_list));
PORTAL_FREE(peer, sizeof(*peer));
}
int
-kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
-{
- kra_conn_t *conn;
- struct list_head *ctmp;
- struct list_head *cnxt;
- int count = 0;
-
- list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
- conn = list_entry(ctmp, kra_conn_t, rac_list);
-
- if (conn->rac_peer_incarnation == incarnation)
- continue;
-
- CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
- peer->rap_nid, conn->rac_peer_incarnation, incarnation);
- LASSERT (conn->rac_peer_incarnation < incarnation);
-
- count++;
- kranal_close_conn_locked(conn, -ESTALE);
- }
-
- return count;
-}
-
-int
kranal_close_matching_conns (ptl_nid_t nid)
{
unsigned long flags;
tx->tx_isnblk = isnblk;
tx->tx_buftype = RANAL_BUF_NONE;
+ tx->tx_msg.ram_type = RANAL_MSG_NONE;
list_add(&tx->tx_list, freelist);
}
void
kranal_device_fini(kra_device_t *dev)
{
+ LASSERT(dev->rad_scheduler == NULL);
RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
kranal_api_shutdown (nal_t *nal)
{
int i;
- int rc;
unsigned long flags;
if (nal->nal_refct != 0) {
memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
/* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
- * a unique (for all time) incarnation so we can uniquely identify
- * the sender. The incarnation is an incrementing counter
+ * a unique (for all time) connstamp so we can uniquely identify
+ * the sender. The connstamp is an incrementing counter
* initialised with seconds + microseconds at startup time. So we
* rely on NOT creating connections more frequently on average than
- * 1MHz to ensure we don't use old incarnations when we reboot. */
+ * 1MHz to ensure we don't use old connstamps when we reboot. */
do_gettimeofday(&tv);
- kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+ kranal_data.kra_connstamp =
+ kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
init_MUTEX(&kranal_data.kra_nid_mutex);
init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
spin_lock_init(&dev->rad_lock);
}
+ kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
init_waitqueue_head(&kranal_data.kra_reaper_waitq);
spin_lock_init(&kranal_data.kra_reaper_lock);
#include <rapl.h>
-#if CONFIG_SMP
-# define RANAL_N_SCHED num_online_cpus() /* # schedulers */
-#else
-# define RANAL_N_SCHED 1 /* # schedulers */
-#endif
-
#define RANAL_MAXDEVS 2 /* max # devices RapidArray supports */
#define RANAL_N_CONND 4 /* # connection daemons */
#define RANAL_MIN_RECONNECT_INTERVAL 1 /* first failed connection retry (seconds)... */
#define RANAL_MAX_RECONNECT_INTERVAL 60 /* ...exponentially increasing to this */
-#define RANAL_FMA_PREFIX_LEN 232 /* size of FMA "Prefix" */
-#define RANAL_FMA_MAX_DATA_LEN ((7<<10)-256) /* Max FMA MSG is 7K including prefix */
+#define RANAL_FMA_MAX_PREFIX 232 /* max size of FMA "Prefix" */
+#define RANAL_FMA_MAX_DATA ((7<<10)-256) /* Max FMA MSG is 7K including prefix */
#define RANAL_PEER_HASH_SIZE 101 /* # peer lists */
#define RANAL_CONN_HASH_SIZE 101 /* # conn lists */
/* default vals for runtime tunables */
#define RANAL_TIMEOUT 30 /* comms timeout (seconds) */
#define RANAL_LISTENER_TIMEOUT 5 /* listener timeout (seconds) */
-#define RANAL_MAX_IMMEDIATE (2<<10) /* biggest immediate payload */
+#define RANAL_MAX_IMMEDIATE (2<<10) /* immediate payload breakpoint */
typedef struct
{
int kra_listener_timeout; /* max time the listener can block */
int kra_backlog; /* listener's backlog */
int kra_port; /* listener's TCP/IP port */
- int kra_max_immediate; /* biggest immediate payload */
+ int kra_max_immediate; /* immediate payload breakpoint */
+
struct ctl_table_header *kra_sysctl; /* sysctl interface */
} kra_tunables_t;
int rad_idx; /* index in kra_devices */
int rad_ready; /* set by device callback */
struct list_head rad_connq; /* connections requiring attention */
+ struct list_head rad_zombies; /* connections to free */
wait_queue_head_t rad_waitq; /* scheduler waits here */
spinlock_t rad_lock; /* serialise */
+ void *rad_scheduler; /* scheduling thread */
} kra_device_t;
typedef struct
struct list_head *kra_conns; /* conns hashed by cqid */
int kra_conn_hash_size; /* size of kra_conns */
- __u64 kra_next_incarnation; /* conn incarnation # generator */
+ __u64 kra_peerstamp; /* when I started up */
+ __u64 kra_connstamp; /* conn stamp generator */
int kra_next_cqid; /* cqid generator */
atomic_t kra_nconns; /* # connections extant */
{ /* (sent via socket) */
__u32 racr_magic; /* I'm an ranal connreq */
__u16 racr_version; /* this is my version number */
- __u16 racr_devid; /* which device to connect on */
- __u64 racr_nid; /* my NID */
- __u64 racr_incarnation; /* my incarnation */
- __u32 racr_timeout; /* my timeout */
- RAP_RI_PARAMETERS racr_riparams; /* my endpoint info */
+ __u16 racr_devid; /* sender's device ID */
+ __u64 racr_nid; /* sender's NID */
+ __u64 racr_peerstamp; /* sender's instance stamp */
+ __u64 racr_connstamp; /* sender's connection stamp */
+ __u32 racr_timeout; /* sender's timeout */
+ RAP_RI_PARAMETERS racr_riparams; /* sender's endpoint info */
} kra_connreq_t;
typedef struct
__u16 ram_version; /* this is my version number */
__u16 ram_type; /* msg type */
__u64 ram_srcnid; /* sender's NID */
- __u64 ram_incarnation; /* sender's connection incarnation */
+ __u64 ram_connstamp; /* sender's connection stamp */
union {
kra_immediate_msg_t immediate;
kra_putreq_msg_t putreq;
struct kra_peer *rac_peer; /* owning peer */
struct list_head rac_list; /* stash on peer's conn list */
struct list_head rac_hashlist; /* stash in connection hash table */
- struct list_head rac_schedlist; /* queue for scheduler */
+ struct list_head rac_schedlist; /* schedule (on rad_connq) for attention */
struct list_head rac_fmaq; /* txs queued for FMA */
struct list_head rac_rdmaq; /* txs awaiting RDMA completion */
struct list_head rac_replyq; /* txs awaiting replies */
- __u64 rac_peer_incarnation; /* peer's unique connection stamp */
- __u64 rac_my_incarnation; /* my unique connection stamp */
+ __u64 rac_peerstamp; /* peer's unique stamp */
+ __u64 rac_peer_connstamp; /* peer's unique connection stamp */
+ __u64 rac_my_connstamp; /* my unique connection stamp */
unsigned long rac_last_tx; /* when I last sent an FMA message */
unsigned long rac_last_rx; /* when I last received an FMA messages */
long rac_keepalive; /* keepalive interval */
atomic_t rac_refcount; /* # users */
unsigned int rac_close_sent; /* I've sent CLOSE */
unsigned int rac_close_recvd; /* I've received CLOSE */
- unsigned int rac_closing; /* connection being torn down */
+ unsigned int rac_state; /* connection state */
unsigned int rac_scheduled; /* being attented to */
spinlock_t rac_lock; /* serialise */
kra_device_t *rac_device; /* which device */
kra_msg_t rac_msg; /* keepalive/CLOSE message buffer */
} kra_conn_t;
+#define RANAL_CONN_ESTABLISHED 0
+#define RANAL_CONN_CLOSING 1
+#define RANAL_CONN_CLOSED 2
+
typedef struct kra_peer
{
struct list_head rap_list; /* stash on global peer list */
extern kra_data_t kranal_data;
extern kra_tunables_t kranal_tunables;
-extern void __kranal_peer_decref(kra_peer_t *peer);
-extern void __kranal_conn_decref(kra_conn_t *conn);
+extern void kranal_destroy_peer(kra_peer_t *peer);
+extern void kranal_destroy_conn(kra_conn_t *conn);
static inline void
kranal_peer_addref(kra_peer_t *peer)
CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid);
LASSERT(atomic_read(&peer->rap_refcount) > 0);
if (atomic_dec_and_test(&peer->rap_refcount))
- __kranal_peer_decref(peer);
+ kranal_destroy_peer(peer);
}
static inline struct list_head *
{
unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size;
- return (&kranal_data.kra_peers [hash]);
+ return (&kranal_data.kra_peers[hash]);
}
static inline int
CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid);
LASSERT(atomic_read(&conn->rac_refcount) > 0);
if (atomic_dec_and_test(&conn->rac_refcount))
- __kranal_conn_decref(conn);
+ kranal_destroy_conn(conn);
}
static inline struct list_head *
extern int kranal_listener_procint(ctl_table *table,
int write, struct file *filp,
void *buffer, size_t *lenp);
-extern int kranal_close_stale_conns_locked (kra_peer_t *peer,
- __u64 incarnation);
extern void kranal_update_reaper_timeout(long timeout);
extern void kranal_tx_done (kra_tx_t *tx, int completion);
extern void kranal_unlink_peer_locked (kra_peer_t *peer);
spin_unlock_irqrestore(&dev->rad_lock, flags);
}
-void
-kranal_schedule_cqid (__u32 cqid)
-{
- kra_conn_t *conn;
- struct list_head *conns;
- struct list_head *tmp;
-
- conns = kranal_cqid2connlist(cqid);
-
- read_lock(&kranal_data.kra_global_lock);
-
- conn = kranal_cqid2conn_locked(cqid);
-
- if (conn == NULL)
- CWARN("no cqid %x\n", cqid);
- else
- kranal_schedule_conn(conn);
-
- read_unlock(&kranal_data.kra_global_lock);
-}
-
-void
-kranal_schedule_dev(kra_device_t *dev)
-{
- kra_conn_t *conn;
- struct list_head *conns;
- struct list_head *tmp;
- int i;
-
- /* Don't do this in IRQ context (servers may have 1000s of clients) */
- LASSERT (!in_interrupt());
-
- CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
-
- for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
-
- /* Drop the lock on each hash bucket to ensure we don't
- * block anyone for too long at IRQ priority on another CPU */
-
- read_lock(&kranal_data.kra_global_lock);
-
- conns = &kranal_data.kra_conns[i];
-
- list_for_each (tmp, conns) {
- conn = list_entry(tmp, kra_conn_t, rac_hashlist);
-
- if (conn->rac_device == dev)
- kranal_schedule_conn(conn);
- }
- read_unlock(&kranal_data.kra_global_lock);
- }
-}
-
-void
-kranal_tx_done (kra_tx_t *tx, int completion)
-{
- ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
- kra_device_t *dev;
- unsigned long flags;
- int i;
- RAP_RETURN rrc;
-
- LASSERT (!in_interrupt());
-
- switch (tx->tx_buftype) {
- default:
- LBUG();
-
- case RANAL_BUF_NONE:
- case RANAL_BUF_IMMEDIATE:
- case RANAL_BUF_PHYS_UNMAPPED:
- case RANAL_BUF_VIRT_UNMAPPED:
- break;
-
- case RANAL_BUF_PHYS_MAPPED:
- LASSERT (tx->tx_conn != NULL);
- dev = tx->tx_conn->rac_device;
- rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
- dev->rad_ptag, &tx->tx_map_key);
- LASSERT (rrc == RAP_SUCCESS);
- break;
-
- case RANAL_BUF_VIRT_MAPPED:
- LASSERT (tx->tx_conn != NULL);
- dev = tx->tx_conn->rac_device;
- rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
- dev->rad_ptag, &tx->tx_map_key);
- LASSERT (rrc == RAP_SUCCESS);
- break;
- }
-
- for (i = 0; i < 2; i++) {
- /* tx may have up to 2 libmsgs to finalise */
- if (tx->tx_libmsg[i] == NULL)
- continue;
-
- lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
- tx->tx_libmsg[i] = NULL;
- }
-
- tx->tx_buftype = RANAL_BUF_NONE;
- tx->tx_msg.ram_type = RANAL_MSG_NONE;
- tx->tx_conn = NULL;
-
- spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
-
- if (tx->tx_isnblk) {
- list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
- } else {
- list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
- wake_up(&kranal_data.kra_idle_tx_waitq);
- }
-
- spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-}
-
kra_tx_t *
kranal_get_idle_tx (int may_block)
{
msg->ram_version = RANAL_MSG_VERSION;
msg->ram_type = type;
msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
- /* ram_incarnation gets set when FMA is sent */
+ /* ram_connstamp gets set when FMA is sent */
}
kra_tx_t *
int offset, int nob)
{
+ /* For now this is almost identical to kranal_setup_virt_buffer, but we
+ * could "flatten" the payload into a single contiguous buffer ready
+ * for sending direct over an FMA if we ever needed to. */
+
LASSERT (nob > 0);
LASSERT (niov > 0);
LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
if (kiov->kiov_offset != 0 ||
((resid > PAGE_SIZE) &&
kiov->kiov_len < PAGE_SIZE)) {
- int i;
/* Can't have gaps */
CERROR("Can't make payload contiguous in I/O VM:"
"page %d, offset %d, len %d \n",
kra_device_t *dev = conn->rac_device;
RAP_RETURN rrc;
+ LASSERT (current == dev->rad_scheduler);
+
switch (tx->tx_buftype) {
default:
+ LBUG();
+
+ case RANAL_BUF_NONE:
+ case RANAL_BUF_IMMEDIATE:
+ case RANAL_BUF_PHYS_MAPPED:
+ case RANAL_BUF_VIRT_MAPPED:
+ break;
case RANAL_BUF_PHYS_UNMAPPED:
- rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
+ rrc = RapkRegisterPhys(dev->rad_handle,
tx->tx_phys, tx->tx_phys_npages,
- conn->rac_device->rad_ptag,
- &tx->tx_map_key);
+ dev->rad_ptag, &tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
- return;
+ break;
case RANAL_BUF_VIRT_UNMAPPED:
- rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
+ rrc = RapkRegisterMemory(dev->rad_handle,
tx->tx_buffer, tx->tx_nob,
- conn->rac_device->rad_ptag,
- &tx->tx_map_key);
+ dev->rad_ptag, &tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
- return;
+ break;
+ }
+}
+
+void
+kranal_unmap_buffer (kra_tx_t *tx)
+{
+ kra_device_t *dev;
+ RAP_RETURN rrc;
+
+ switch (tx->tx_buftype) {
+ default:
+ LBUG();
+
+ case RANAL_BUF_NONE:
+ case RANAL_BUF_IMMEDIATE:
+ case RANAL_BUF_PHYS_UNMAPPED:
+ case RANAL_BUF_VIRT_UNMAPPED:
+ break;
+
+ case RANAL_BUF_PHYS_MAPPED:
+ LASSERT (tx->tx_conn != NULL);
+ dev = tx->tx_conn->rac_device;
+ LASSERT (current == dev->rad_scheduler);
+ rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
+ dev->rad_ptag, &tx->tx_map_key);
+ LASSERT (rrc == RAP_SUCCESS);
+ tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
+ break;
+
+ case RANAL_BUF_VIRT_MAPPED:
+ LASSERT (tx->tx_conn != NULL);
+ dev = tx->tx_conn->rac_device;
+ LASSERT (current == dev->rad_scheduler);
+ rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
+ dev->rad_ptag, &tx->tx_map_key);
+ LASSERT (rrc == RAP_SUCCESS);
+ tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
+ break;
+ }
+}
+
+void
+kranal_tx_done (kra_tx_t *tx, int completion)
+{
+ ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
+ unsigned long flags;
+ int i;
+
+ LASSERT (!in_interrupt());
+
+ kranal_unmap_buffer(tx);
+
+ for (i = 0; i < 2; i++) {
+ /* tx may have up to 2 libmsgs to finalise */
+ if (tx->tx_libmsg[i] == NULL)
+ continue;
+
+ lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
+ tx->tx_libmsg[i] = NULL;
+ }
+
+ tx->tx_buftype = RANAL_BUF_NONE;
+ tx->tx_msg.ram_type = RANAL_MSG_NONE;
+ tx->tx_conn = NULL;
+
+ spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+
+ if (tx->tx_isnblk) {
+ list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
+ } else {
+ list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
+ wake_up(&kranal_data.kra_idle_tx_waitq);
}
+
+ spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
}
kra_conn_t *
LASSERT (peer->rap_persistence > 0);
if (!peer->rap_connecting) {
+ LASSERT (list_empty(&peer->rap_tx_queue));
+
now = CURRENT_TIME;
if (now < peer->rap_reconnect_time) {
write_unlock_irqrestore(g_lock, flags);
write_unlock_irqrestore(g_lock, flags);
}
-static void
+void
kranal_rdma(kra_tx_t *tx, int type,
- kra_rdma_desc_t *rard, int nob, __u64 cookie)
+ kra_rdma_desc_t *sink, int nob, __u64 cookie)
{
kra_conn_t *conn = tx->tx_conn;
RAP_RETURN rrc;
unsigned long flags;
- /* prep final completion message */
- kranal_init_msg(&tx->tx_msg, type);
- tx->tx_msg.ram_u.completion.racm_cookie = cookie;
-
- LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
- tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
- LASSERT (nob <= rard->rard_nob);
+ LASSERT (kranal_tx_mapped(tx));
+ LASSERT (nob <= sink->rard_nob);
+ LASSERT (nob <= tx->tx_nob);
+ /* No actual race with scheduler sending CLOSE (I'm she!) */
+ LASSERT (current == conn->rac_device->rad_scheduler);
+
memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
- tx->tx_rdma_desc.DstPtr = rard->rard_addr;
- tx->tx_rdma_desc.DstKey = rard->rard_key;
+ tx->tx_rdma_desc.DstPtr = sink->rard_addr;
+ tx->tx_rdma_desc.DstKey = sink->rard_key;
tx->tx_rdma_desc.Length = nob;
tx->tx_rdma_desc.AppPtr = tx;
+ /* prep final completion message */
+ kranal_init_msg(&tx->tx_msg, type);
+ tx->tx_msg.ram_u.completion.racm_cookie = cookie;
+
if (nob == 0) { /* Immediate completion */
kranal_post_fma(conn, tx);
return;
}
-
+
+ LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
+
rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
LASSERT (rrc == RAP_SUCCESS);
LASSERT (conn->rac_rxmsg != NULL);
if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
- if (nob > RANAL_MAX_IMMEDIATE) {
+ if (nob > RANAL_FMA_MAX_DATA) {
CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
nob, nid);
return PTL_FAIL;
case PTL_MSG_GET:
if (kiov == NULL && /* not paged */
- nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+ nob <= RANAL_FMA_MAX_DATA && /* small enough */
nob <= kranal_tunables.kra_max_immediate)
break; /* send IMMEDIATE */
- tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
+ tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
if (tx == NULL)
return PTL_NO_SPACE;
return PTL_FAIL;
}
+ tx->tx_conn = conn;
kranal_map_buffer(tx);
tx->tx_msg.ram_u.putack.rapam_src_cookie =
}
int
-kranal_check_conn (kra_conn_t *conn)
+kranal_check_conn_timeouts (kra_conn_t *conn)
{
kra_tx_t *tx;
struct list_head *ttmp;
long timeout;
unsigned long now = jiffies;
- if (!conn->rac_closing &&
+ LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
+ conn->rac_state == RANAL_CONN_CLOSING);
+
+ if (!conn->rac_close_sent &&
time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
/* not sent in a while; schedule conn so scheduler sends a keepalive */
kranal_schedule_conn(conn);
}
- /* wait twice as long for CLOSE to be sure peer is dead */
- timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
+ timeout = conn->rac_timeout * HZ;
if (!conn->rac_close_recvd &&
time_after_eq(now, conn->rac_last_rx + timeout)) {
return -ETIMEDOUT;
}
- if (conn->rac_closing)
+ if (conn->rac_state != RANAL_CONN_ESTABLISHED)
return 0;
/* Check the conn's queues are moving. These are "belt+braces" checks,
}
void
-kranal_check_conns (int idx, unsigned long *min_timeoutp)
+kranal_reaper_check (int idx, unsigned long *min_timeoutp)
{
struct list_head *conns = &kranal_data.kra_conns[idx];
struct list_head *ctmp;
if (conn->rac_keepalive < *min_timeoutp )
*min_timeoutp = conn->rac_keepalive;
- rc = kranal_check_conn(conn);
+ rc = kranal_check_conn_timeouts(conn);
if (rc == 0)
continue;
kranal_conn_addref(conn);
read_unlock(&kranal_data.kra_global_lock);
- CERROR("Check on conn to "LPX64"failed: %d\n",
- conn->rac_peer->rap_nid, rc);
+ CERROR("Conn to "LPX64", cqid %d timed out\n",
+ conn->rac_peer->rap_nid, conn->rac_cqid);
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
- if (!conn->rac_closing)
+ switch (conn->rac_state) {
+ default:
+ LBUG();
+
+ case RANAL_CONN_ESTABLISHED:
kranal_close_conn_locked(conn, -ETIMEDOUT);
- else
- kranal_terminate_conn_locked(conn);
+ break;
+ case RANAL_CONN_CLOSING:
+ kranal_terminate_conn_locked(conn);
+ break;
+ }
+
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
kranal_conn_decref(conn);
wait_queue_t wait;
unsigned long flags;
kra_peer_t *peer;
- int i;
snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
kportal_daemonize(name);
{
wait_queue_t wait;
unsigned long flags;
- kra_conn_t *conn;
- kra_peer_t *peer;
long timeout;
int i;
int conn_entries = kranal_data.kra_conn_hash_size;
init_waitqueue_entry(&wait, current);
spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
- kranal_data.kra_new_min_timeout = 1;
while (!kranal_data.kra_shutdown) {
chunk = 1;
for (i = 0; i < chunk; i++) {
- kranal_check_conns(conn_index,
- &next_min_timeout);
+ kranal_reaper_check(conn_index,
+ &next_min_timeout);
conn_index = (conn_index + 1) % conn_entries;
}
}
void
-kranal_process_rdmaq (__u32 cqid)
+kranal_check_rdma_cq (kra_device_t *dev)
{
kra_conn_t *conn;
kra_tx_t *tx;
RAP_RETURN rrc;
unsigned long flags;
RAP_RDMA_DESCRIPTOR *desc;
-
- read_lock(&kranal_data.kra_global_lock);
+ __u32 cqid;
+ __u32 event_type;
+
+ for (;;) {
+ rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
+ if (rrc == RAP_NOT_DONE)
+ return;
- conn = kranal_cqid2conn_locked(cqid);
- LASSERT (conn != NULL);
+ LASSERT (rrc == RAP_SUCCESS);
+ LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
- rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
- LASSERT (rrc == RAP_SUCCESS);
+ read_lock(&kranal_data.kra_global_lock);
- spin_lock_irqsave(&conn->rac_lock, flags);
+ conn = kranal_cqid2conn_locked(cqid);
+ if (conn == NULL) {
+ /* Conn was destroyed? */
+ CWARN("RDMA CQID lookup %d failed\n", cqid);
+ read_unlock(&kranal_data.kra_global_lock);
+ continue;
+ }
- LASSERT (!list_empty(&conn->rac_rdmaq));
- tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
- list_del(&tx->tx_list);
+ rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
+ LASSERT (rrc == RAP_SUCCESS);
- LASSERT(desc->AppPtr == (void *)tx);
- LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
- tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+ spin_lock_irqsave(&conn->rac_lock, flags);
- list_add_tail(&tx->tx_list, &conn->rac_fmaq);
- tx->tx_qtime = jiffies;
+ LASSERT (!list_empty(&conn->rac_rdmaq));
+ tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
+ list_del(&tx->tx_list);
+
+ LASSERT(desc->AppPtr == (void *)tx);
+ LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
+ tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+
+ list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+ tx->tx_qtime = jiffies;
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
- /* Get conn's fmaq processed, now I've just put something there */
- kranal_schedule_conn(conn);
+ /* Get conn's fmaq processed, now I've just put something
+ * there */
+ kranal_schedule_conn(conn);
- read_unlock(&kranal_data.kra_global_lock);
+ read_unlock(&kranal_data.kra_global_lock);
+ }
+}
+
+void
+kranal_check_fma_cq (kra_device_t *dev)
+{
+ kra_conn_t *conn;
+ RAP_RETURN rrc;
+ __u32 cqid;
+ __u32 event_type;
+ struct list_head *conns;
+ struct list_head *tmp;
+ int i;
+
+ for (;;) {
+ rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
+ if (rrc != RAP_NOT_DONE)
+ return;
+
+ LASSERT (rrc == RAP_SUCCESS);
+
+ if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
+
+ read_lock(&kranal_data.kra_global_lock);
+
+ conn = kranal_cqid2conn_locked(cqid);
+ if (conn == NULL)
+ CWARN("FMA CQID lookup %d failed\n", cqid);
+ else
+ kranal_schedule_conn(conn);
+
+ read_unlock(&kranal_data.kra_global_lock);
+ continue;
+ }
+
+ /* FMA CQ has overflowed: check ALL conns */
+ CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
+
+ for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
+
+ read_lock(&kranal_data.kra_global_lock);
+
+ conns = &kranal_data.kra_conns[i];
+
+ list_for_each (tmp, conns) {
+ conn = list_entry(tmp, kra_conn_t,
+ rac_hashlist);
+
+ if (conn->rac_device == dev)
+ kranal_schedule_conn(conn);
+ }
+
+ /* don't block write lockers for too long... */
+ read_unlock(&kranal_data.kra_global_lock);
+ }
+ }
}
int
int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
RAP_RETURN rrc;
- LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
+ LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
- immediatenob <= RANAL_FMA_MAX_DATA_LEN :
+ immediatenob <= RANAL_FMA_MAX_DATA :
immediatenob == 0);
- msg->ram_incarnation = conn->rac_my_incarnation;
+ msg->ram_connstamp = conn->rac_my_connstamp;
msg->ram_seq = conn->rac_tx_seq;
if (sync)
}
}
-int
+void
kranal_process_fmaq (kra_conn_t *conn)
{
unsigned long flags;
int rc;
int expect_reply;
- /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
- * out of credits when I try to send right now... */
-
- if (conn->rac_closing) {
+ /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
+ * However I will be rescheduled some by a rad_fma_cq event when
+ * I eventually get some.
+ * NB 2. Sampling rac_state here, races with setting it elsewhere
+ * kranal_close_conn_locked. But it doesn't matter if I try to
+ * send a "real" message just as I start closing because I'll get
+ * scheduled to send the close anyway. */
+ if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
if (!list_empty(&conn->rac_rdmaq)) {
- /* Can't send CLOSE yet; I'm still waiting for RDMAs I
- * posted to finish */
+ /* RDMAs in progress */
LASSERT (!conn->rac_close_sent);
- kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
- kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
- return 0;
+
+ if (time_after_eq(jiffies,
+ conn->rac_last_tx +
+ conn->rac_keepalive)) {
+ kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+ kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+ }
+ return;
}
-
- if (conn->rac_close_sent)
- return 0;
+ if (conn->rac_close_sent)
+ return;
+
kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
- conn->rac_close_sent = (rc == 0);
- return 0;
+ if (rc != 0)
+ return;
+
+ conn->rac_close_sent = 1;
+ if (!conn->rac_close_recvd)
+ return;
+
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+ if (conn->rac_state == RANAL_CONN_CLOSING)
+ kranal_terminate_conn_locked(conn);
+
+ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+ return;
}
spin_lock_irqsave(&conn->rac_lock, flags);
kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
}
- return 0;
+ return;
}
tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
}
if (rc == -EAGAIN) {
- /* replace at the head of the list for later */
+ /* I need credits to send this. Replace tx at the head of the
+ * fmaq and I'll get rescheduled when credits appear */
spin_lock_irqsave(&conn->rac_lock, flags);
list_add(&tx->tx_list, &conn->rac_fmaq);
spin_unlock_irqrestore(&conn->rac_lock, flags);
-
- return 0;
+ return;
}
LASSERT (rc == 0);
spin_unlock_irqrestore(&conn->rac_lock, flags);
}
- return more_to_do;
+ if (more_to_do)
+ kranal_schedule_conn(conn);
}
static inline void
kra_tx_t *
kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
{
- unsigned long flags;
struct list_head *ttmp;
kra_tx_t *tx;
return NULL;
}
-int
-kranal_process_receives(kra_conn_t *conn)
+void
+kranal_check_fma_rx (kra_conn_t *conn)
{
unsigned long flags;
__u32 seq;
- __u32 nob;
kra_tx_t *tx;
kra_msg_t *msg;
void *prefix;
kra_peer_t *peer = conn->rac_peer;
if (rrc == RAP_NOT_DONE)
- return 0;
+ return;
LASSERT (rrc == RAP_SUCCESS);
conn->rac_last_rx = jiffies;
__swab16s(&msg->ram_version);
__swab16s(&msg->ram_type);
__swab64s(&msg->ram_srcnid);
- __swab64s(&msg->ram_incarnation);
+ __swab64s(&msg->ram_connstamp);
__swab32s(&msg->ram_seq);
/* NB message type checked below; NOT here... */
goto out;
}
- if (msg->ram_incarnation != conn->rac_peer_incarnation) {
- CERROR("Unexpected incarnation "LPX64"("LPX64
+ if (msg->ram_connstamp != conn->rac_peer_connstamp) {
+ CERROR("Unexpected connstamp "LPX64"("LPX64
" expected) from "LPX64"\n",
- msg->ram_incarnation, conn->rac_peer_incarnation,
+ msg->ram_connstamp, conn->rac_peer_connstamp,
peer->rap_nid);
goto out;
}
}
if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
- /* This message signals RDMA completion: wait now... */
+ /* This message signals RDMA completion... */
rrc = RapkFmaSyncWait(conn->rac_rihandle);
LASSERT (rrc == RAP_SUCCESS);
}
-
+
+ if (conn->rac_close_recvd) {
+ CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
+ msg->ram_type, conn->rac_peer->rap_nid);
+ goto out;
+ }
+
if (msg->ram_type == RANAL_MSG_CLOSE) {
conn->rac_close_recvd = 1;
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
- if (!conn->rac_closing)
- kranal_close_conn_locked(conn, -ETIMEDOUT);
+ if (conn->rac_state == RANAL_CONN_ESTABLISHED)
+ kranal_close_conn_locked(conn, 0);
else if (conn->rac_close_sent)
kranal_terminate_conn_locked(conn);
goto out;
}
- if (conn->rac_closing)
+ if (conn->rac_state != RANAL_CONN_ESTABLISHED)
goto out;
conn->rac_rxmsg = msg; /* stash message for portals callbacks */
if (conn->rac_rxmsg != NULL)
kranal_consume_rxmsg(conn, NULL, 0);
- return 1;
+ /* check again later */
+ kranal_schedule_conn(conn);
+}
+
+void
+kranal_complete_closed_conn (kra_conn_t *conn)
+{
+ kra_tx_t *tx;
+
+ LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
+
+ while (!list_empty(&conn->rac_fmaq)) {
+ tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+
+ list_del(&tx->tx_list);
+ kranal_tx_done(tx, -ECONNABORTED);
+ }
+
+ LASSERT (list_empty(&conn->rac_rdmaq));
+
+ while (!list_empty(&conn->rac_replyq)) {
+ tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
+
+ list_del(&tx->tx_list);
+ kranal_tx_done(tx, -ECONNABORTED);
+ }
}
int
char name[16];
kra_conn_t *conn;
unsigned long flags;
- RAP_RETURN rrc;
- int rc;
- int resched;
- int i;
- __u32 cqid;
- __u32 event_type;
- int did_something;
int busy_loops = 0;
snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
kportal_daemonize(name);
kportal_blockallsigs();
+ dev->rad_scheduler = current;
init_waitqueue_entry(&wait, current);
spin_lock_irqsave(&dev->rad_lock, flags);
spin_lock_irqsave(&dev->rad_lock, flags);
}
- did_something = 0;
-
if (dev->rad_ready) {
+ /* Device callback fired since I last checked it */
dev->rad_ready = 0;
spin_unlock_irqrestore(&dev->rad_lock, flags);
- rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
+ kranal_check_rdma_cq(dev);
+ kranal_check_fma_cq(dev);
- LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
- LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
-
- if (rrc == RAP_SUCCESS) {
- kranal_process_rdmaq(cqid);
- did_something = 1;
- }
-
- rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
- LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
-
- if (rrc == RAP_SUCCESS) {
- if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
- kranal_schedule_dev(dev);
- else
- kranal_schedule_cqid(cqid);
- did_something = 1;
- }
-
spin_lock_irqsave(&dev->rad_lock, flags);
-
- /* If there were no completions to handle, I leave
- * rad_ready clear. NB I cleared it BEFORE I checked
- * the completion queues since I'm racing with the
- * device callback. */
-
- if (did_something)
- dev->rad_ready = 1;
}
if (!list_empty(&dev->rad_connq)) {
+ /* Connection needs attention */
conn = list_entry(dev->rad_connq.next,
kra_conn_t, rac_schedlist);
- list_del(&conn->rac_schedlist);
+ list_del_init(&conn->rac_schedlist);
+ LASSERT (conn->rac_scheduled);
+ conn->rac_scheduled = 0;
spin_unlock_irqrestore(&dev->rad_lock, flags);
- LASSERT (conn->rac_scheduled);
+ kranal_check_fma_rx(conn);
+ kranal_process_fmaq(conn);
- resched = kranal_process_fmaq(conn);
- resched |= kranal_process_receives(conn);
- did_something = 1;
+ if (conn->rac_state == RANAL_CONN_CLOSED)
+ kranal_complete_closed_conn(conn);
+ kranal_conn_decref(conn);
+
spin_lock_irqsave(&dev->rad_lock, flags);
- if (resched)
- list_add_tail(&conn->rac_schedlist,
- &dev->rad_connq);
- }
-
- if (did_something)
continue;
+ }
add_wait_queue(&dev->rad_waitq, &wait);
set_current_state(TASK_INTERRUPTIBLE);
spin_unlock_irqrestore(&dev->rad_lock, flags);
+ dev->rad_scheduler = NULL;
kranal_thread_fini();
return 0;
}