From 7f1aed86cbd26149a5ad07db4ebe2260ac0656c7 Mon Sep 17 00:00:00 2001 From: eeb Date: Mon, 20 Dec 2004 21:24:34 +0000 Subject: [PATCH] * ranal code review --- lnet/klnds/ralnd/Makefile.in | 2 +- lnet/klnds/ralnd/ralnd.c | 346 ++++++++++++++----------- lnet/klnds/ralnd/ralnd.h | 58 +++-- lnet/klnds/ralnd/ralnd_cb.c | 602 ++++++++++++++++++++++++------------------- 4 files changed, 567 insertions(+), 441 deletions(-) diff --git a/lnet/klnds/ralnd/Makefile.in b/lnet/klnds/ralnd/Makefile.in index 1772cc2..eb806e2 100644 --- a/lnet/klnds/ralnd/Makefile.in +++ b/lnet/klnds/ralnd/Makefile.in @@ -1,6 +1,6 @@ MODULES := kranal kranal-objs := ranal.o ranal_cb.o -EXTRA_POST_CFLAGS := @RACPPFLAGS@ +EXTRA_POST_CFLAGS := @RACPPFLAGS@ -Wall @INCLUDE_RULES@ diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c index c924827..0aa1d53 100644 --- a/lnet/klnds/ralnd/ralnd.c +++ b/lnet/klnds/ralnd/ralnd.c @@ -162,7 +162,6 @@ kranal_create_sock(struct socket **sockp) { struct socket *sock; int rc; - struct timeval tv; int option; mm_segment_t oldmm = get_fs(); @@ -215,12 +214,13 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) memset(connreq, 0, sizeof(*connreq)); - connreq->racr_magic = RANAL_MSG_MAGIC; - connreq->racr_version = RANAL_MSG_VERSION; - connreq->racr_devid = conn->rac_device->rad_id; - connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid; - connreq->racr_timeout = conn->rac_timeout; - connreq->racr_incarnation = conn->rac_my_incarnation; + connreq->racr_magic = RANAL_MSG_MAGIC; + connreq->racr_version = RANAL_MSG_VERSION; + connreq->racr_devid = conn->rac_device->rad_id; + connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_peerstamp = kranal_data.kra_peerstamp; + connreq->racr_connstamp = conn->rac_my_connstamp; + connreq->racr_timeout = conn->rac_timeout; rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams); LASSERT(rrc == RAP_SUCCESS); @@ -229,7 +229,6 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) int kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) { - int i; int rc; rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout); @@ -248,7 +247,8 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) __swab16s(&connreq->racr_version); __swab16s(&connreq->racr_devid); __swab64s(&connreq->racr_nid); - __swab64s(&connreq->racr_incarnation); + __swab64s(&connreq->racr_peerstamp); + __swab64s(&connreq->racr_connstamp); __swab32s(&connreq->racr_timeout); __swab32s(&connreq->racr_riparams.FmaDomainHndl); @@ -273,43 +273,100 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) return -EPROTO; } - for (i = 0; i < kranal_data.kra_ndevs; i++) - if (connreq->racr_devid == - kranal_data.kra_devices[i].rad_id) - break; + return 0; +} - if (i == kranal_data.kra_ndevs) { - CERROR("Can't match device %d\n", connreq->racr_devid); - return -ENODEV; +int +kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn) +{ + kra_conn_t *conn; + struct list_head *ctmp; + struct list_head *cnxt; + int loopback; + int count = 0; + + loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid; + + list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + if (conn == newconn) + continue; + + if (conn->rac_peerstamp != newconn->rac_peerstamp) { + CDEBUG(D_NET, "Closing stale conn nid:"LPX64 + " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid, + conn->rac_peerstamp, newconn->rac_peerstamp); + LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp); + count++; + kranal_close_conn_locked(conn, -ESTALE); + continue; + } + + if (conn->rac_device != newconn->rac_device) + continue; + + if (loopback && + newconn->rac_my_connstamp == conn->rac_peer_connstamp && + newconn->rac_peer_connstamp == conn->rac_my_connstamp) + continue; + + LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp); + + CDEBUG(D_NET, "Closing stale conn nid:"LPX64 + " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, + conn->rac_peer_connstamp, newconn->rac_peer_connstamp); + + count++; + kranal_close_conn_locked(conn, -ESTALE); } - return 0; + return count; } int -kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation) +kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn) { kra_conn_t *conn; struct list_head *tmp; - int loopback = 0; + int loopback; + loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid; + list_for_each(tmp, &peer->rap_conns) { conn = list_entry(tmp, kra_conn_t, rac_list); - if (conn->rac_peer_incarnation < incarnation) { - /* Conns with an older incarnation get culled later */ + /* 'newconn' is from an earlier version of 'peer'!!! */ + if (newconn->rac_peerstamp < conn->rac_peerstamp) + return 1; + + /* 'conn' is from an earlier version of 'peer': it will be + * removed when we cull stale conns later on... */ + if (newconn->rac_peerstamp > conn->rac_peerstamp) continue; - } - if (!loopback && - conn->rac_peer_incarnation == incarnation && - peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) { - /* loopback creates 2 conns */ - loopback = 1; + /* Different devices are OK */ + if (conn->rac_device != newconn->rac_device) + continue; + + /* It's me connecting to myself */ + if (loopback && + newconn->rac_my_connstamp == conn->rac_peer_connstamp && + newconn->rac_peer_connstamp == conn->rac_my_connstamp) continue; - } - return 1; + /* 'newconn' is an earlier connection from 'peer'!!! */ + if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp) + return 2; + + /* 'conn' is an earlier connection from 'peer': it will be + * removed when we cull stale conns later on... */ + if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp) + continue; + + /* 'newconn' has the SAME connection stamp; 'peer' isn't + * playing the game... */ + return 3; } return 0; @@ -322,7 +379,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn) write_lock_irqsave(&kranal_data.kra_global_lock, flags); - conn->rac_my_incarnation = kranal_data.kra_next_incarnation++; + conn->rac_my_connstamp = kranal_data.kra_connstamp++; do { /* allocate a unique cqid */ conn->rac_cqid = kranal_data.kra_next_cqid++; @@ -333,7 +390,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn) } int -kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev) +kranal_create_conn(kra_conn_t **connp, kra_device_t *dev) { kra_conn_t *conn; RAP_RETURN rrc; @@ -348,6 +405,7 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev) atomic_set(&conn->rac_refcount, 1); INIT_LIST_HEAD(&conn->rac_list); INIT_LIST_HEAD(&conn->rac_hashlist); + INIT_LIST_HEAD(&conn->rac_schedlist); INIT_LIST_HEAD(&conn->rac_fmaq); INIT_LIST_HEAD(&conn->rac_rdmaq); INIT_LIST_HEAD(&conn->rac_replyq); @@ -374,34 +432,20 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev) } void -__kranal_conn_decref(kra_conn_t *conn) +kranal_destroy_conn(kra_conn_t *conn) { - kra_tx_t *tx; RAP_RETURN rrc; LASSERT (!in_interrupt()); LASSERT (!conn->rac_scheduled); LASSERT (list_empty(&conn->rac_list)); LASSERT (list_empty(&conn->rac_hashlist)); + LASSERT (list_empty(&conn->rac_schedlist)); LASSERT (atomic_read(&conn->rac_refcount) == 0); - - while (!list_empty(&conn->rac_fmaq)) { - tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); - - list_del(&tx->tx_list); - kranal_tx_done(tx, -ECONNABORTED); - } - - /* We may not destroy this connection while it has RDMAs outstanding */ + LASSERT (list_empty(&conn->rac_fmaq)); LASSERT (list_empty(&conn->rac_rdmaq)); + LASSERT (list_empty(&conn->rac_replyq)); - while (!list_empty(&conn->rac_replyq)) { - tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); - - list_del(&tx->tx_list); - kranal_tx_done(tx, -ECONNABORTED); - } - rrc = RapkDestroyRi(conn->rac_device->rad_handle, conn->rac_rihandle); LASSERT (rrc == RAP_SUCCESS); @@ -416,18 +460,20 @@ __kranal_conn_decref(kra_conn_t *conn) void kranal_terminate_conn_locked (kra_conn_t *conn) { - kra_peer_t *peer = conn->rac_peer; - LASSERT (!in_interrupt()); - LASSERT (conn->rac_closing); + LASSERT (conn->rac_state == RANAL_CONN_CLOSING); LASSERT (!list_empty(&conn->rac_hashlist)); LASSERT (list_empty(&conn->rac_list)); - /* Remove from conn hash table (no new callbacks) */ + /* Remove from conn hash table: no new callbacks */ list_del_init(&conn->rac_hashlist); kranal_conn_decref(conn); - /* Conn is now just waiting for remaining refs to go */ + conn->rac_state = RANAL_CONN_CLOSED; + + /* schedule to clear out all uncompleted comms in context of dev's + * scheduler */ + kranal_schedule_conn(conn); } void @@ -439,7 +485,7 @@ kranal_close_conn_locked (kra_conn_t *conn, int error) "closing conn to "LPX64": error %d\n", peer->rap_nid, error); LASSERT (!in_interrupt()); - LASSERT (!conn->rac_closing); + LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED); LASSERT (!list_empty(&conn->rac_hashlist)); LASSERT (!list_empty(&conn->rac_list)); @@ -450,9 +496,14 @@ kranal_close_conn_locked (kra_conn_t *conn, int error) /* Non-persistent peer with no more conns... */ kranal_unlink_peer_locked(peer); } + + /* Reset RX timeout to ensure we wait for an incoming CLOSE for the + * full timeout */ + conn->rac_last_rx = jiffies; + mb(); - conn->rac_closing = 1; - kranal_schedule_conn(conn); + conn->rac_state = RANAL_CONN_CLOSING; + kranal_schedule_conn(conn); /* schedule sending CLOSE */ kranal_conn_decref(conn); /* lose peer's ref */ } @@ -465,13 +516,33 @@ kranal_close_conn (kra_conn_t *conn, int error) write_lock_irqsave(&kranal_data.kra_global_lock, flags); - if (!conn->rac_closing) + if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, error); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); } int +kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, + __u32 peer_ip, int peer_port) +{ + RAP_RETURN rrc; + + rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams); + if (rrc != RAP_SUCCESS) { + CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer_ip), peer_port, rrc); + return -EPROTO; + } + + conn->rac_peerstamp = connreq->racr_peerstamp; + conn->rac_peer_connstamp = connreq->racr_connstamp; + conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout); + kranal_update_reaper_timeout(conn->rac_keepalive); + return 0; +} + +int kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *peer_nidp, kra_conn_t **connp) { @@ -482,7 +553,6 @@ kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t peer_nid; kra_conn_t *conn; kra_device_t *dev; - RAP_RETURN rrc; int rc; int len; int i; @@ -515,25 +585,24 @@ kranal_passive_conn_handshake (struct socket *sock, LASSERT (peer_nid != PTL_NID_ANY); for (i = 0;;i++) { - LASSERT(i < kranal_data.kra_ndevs); + if (i == kranal_data.kra_ndevs) { + CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n", + connreq.racr_devid, HIPQUAD(peer_ip), peer_port); + return -ENODEV; + } dev = &kranal_data.kra_devices[i]; if (dev->rad_id == connreq.racr_devid) break; } - rc = kranal_alloc_conn(&conn, dev); + rc = kranal_create_conn(&conn, dev); if (rc != 0) return rc; - conn->rac_peer_incarnation = connreq.racr_incarnation; - conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); - kranal_update_reaper_timeout(conn->rac_keepalive); - - rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams); - if (rrc != RAP_SUCCESS) { - CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc); + rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port); + if (rc != 0) { kranal_conn_decref(conn); - return -EPROTO; + return rc; } kranal_pack_connreq(&connreq, conn); @@ -559,9 +628,6 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) struct socket *sock; unsigned int port; int rc; - int option; - mm_segment_t oldmm = get_fs(); - struct timeval tv; for (port = 1023; port >= 512; port--) { @@ -621,19 +687,17 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) int kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) { - struct sockaddr_in dstaddr; kra_connreq_t connreq; kra_conn_t *conn; kra_device_t *dev; struct socket *sock; - RAP_RETURN rrc; int rc; int idx; idx = peer->rap_nid & 0x7fffffff; dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs]; - rc = kranal_alloc_conn(&conn, dev); + rc = kranal_create_conn(&conn, dev); if (rc != 0) return rc; @@ -680,17 +744,10 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) goto failed_0; } - conn->rac_peer_incarnation = connreq.racr_incarnation; - conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); - kranal_update_reaper_timeout(conn->rac_keepalive); - - rc = -ENETDOWN; - rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams); - if (rrc != RAP_SUCCESS) { - CERROR("Can't set riparams for "LPX64": %d\n", - peer->rap_nid, rrc); + rc = kranal_set_conn_params(conn, &connreq, + peer->rap_ip, peer->rap_port); + if (rc != 0) goto failed_0; - } *connp = conn; return 0; @@ -709,13 +766,33 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) kra_tx_t *tx; ptl_nid_t peer_nid; unsigned long flags; - unsigned long timeout; kra_conn_t *conn; int rc; int nstale; - if (sock != NULL) { - /* passive: listener accepted sock */ + if (sock == NULL) { + /* active: connd wants to connect to 'peer' */ + LASSERT (peer != NULL); + LASSERT (peer->rap_connecting); + + rc = kranal_active_conn_handshake(peer, &conn); + if (rc != 0) + return rc; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (!kranal_peer_active(peer)) { + /* raced with peer getting unlinked */ + write_unlock_irqrestore(&kranal_data.kra_global_lock, + flags); + kranal_conn_decref(conn); + return ESTALE; + } + + peer_nid = peer->rap_nid; + + } else { + /* passive: listener accepted 'sock' */ LASSERT (peer == NULL); rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn); @@ -746,39 +823,21 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) * table with no connections: I can't drop the global lock * until I've given it a connection or removed it, and when * I do 'peer' can disappear under me. */ - } else { - /* active: connd wants to connect to peer */ - LASSERT (peer != NULL); - LASSERT (peer->rap_connecting); - - rc = kranal_active_conn_handshake(peer, &conn); - if (rc != 0) - return rc; - - write_lock_irqsave(&kranal_data.kra_global_lock, flags); - - if (!kranal_peer_active(peer)) { - /* raced with peer getting unlinked */ - write_unlock_irqrestore(&kranal_data.kra_global_lock, - flags); - kranal_conn_decref(conn); - return ESTALE; - } - } + } LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */ - peer_nid = peer->rap_nid; /* Refuse to duplicate an existing connection (both sides might try * to connect at once). NB we return success! We _do_ have a * connection (so we don't need to remove the peer from the peer * table) and we _don't_ have any blocked txs to complete */ - if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) { + rc = kranal_conn_isdup_locked(peer, conn); + if (rc != 0) { LASSERT (!list_empty(&peer->rap_conns)); LASSERT (list_empty(&peer->rap_tx_queue)); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); - CWARN("Not creating duplicate connection to "LPX64"\n", - peer_nid); + CWARN("Not creating duplicate connection to "LPX64": %d\n", + peer_nid, rc); kranal_conn_decref(conn); return 0; } @@ -800,7 +859,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) kranal_post_fma(conn, tx); } - nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation); + nstale = kranal_close_stale_conns_locked(peer, conn); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); @@ -989,7 +1048,7 @@ kranal_start_listener (void) /* Called holding kra_nid_mutex: listener stopped */ LASSERT (kranal_data.kra_listener_sock == NULL); - kranal_data.kra_listener_shutdown == 0; + kranal_data.kra_listener_shutdown = 0; pid = kernel_thread(kranal_listener, NULL, 0); if (pid < 0) { CERROR("Can't spawn listener: %ld\n", pid); @@ -1032,6 +1091,10 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp, int old_val; int rc; + /* No race with nal initialisation since the nal is setup all the time + * it's loaded. When that changes, change this! */ + LASSERT (kranal_data.kra_init == RANAL_INIT_ALL); + down(&kranal_data.kra_nid_mutex); LASSERT (tunable == &kranal_tunables.kra_port || @@ -1050,18 +1113,23 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp, rc = kranal_start_listener(); if (rc != 0) { + CWARN("Unable to start listener with new tunable:" + " reverting to old value\n"); *tunable = old_val; kranal_start_listener(); } } up(&kranal_data.kra_nid_mutex); + + LASSERT (kranal_data.kra_init == RANAL_INIT_ALL); return rc; } int kranal_set_mynid(ptl_nid_t nid) { + unsigned long flags; lib_ni_t *ni = &kranal_lib.libnal_ni; int rc = 0; @@ -1079,10 +1147,14 @@ kranal_set_mynid(ptl_nid_t nid) if (kranal_data.kra_listener_sock != NULL) kranal_stop_listener(); + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + kranal_data.kra_peerstamp++; + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + ni->ni_pid.nid = nid; /* Delete all existing peers and their connections after new - * NID/incarnation set to ensure no old connections in our brave + * NID/connstamp set to ensure no old connections in our brave * new world. */ kranal_del_peer(PTL_NID_ANY, 0); @@ -1109,7 +1181,8 @@ kranal_create_peer (ptl_nid_t nid) peer->rap_nid = nid; atomic_set(&peer->rap_refcount, 1); /* 1 ref for caller */ - INIT_LIST_HEAD(&peer->rap_list); /* not in the peer table yet */ + INIT_LIST_HEAD(&peer->rap_list); + INIT_LIST_HEAD(&peer->rap_connd_list); INIT_LIST_HEAD(&peer->rap_conns); INIT_LIST_HEAD(&peer->rap_tx_queue); @@ -1121,16 +1194,17 @@ kranal_create_peer (ptl_nid_t nid) } void -__kranal_peer_decref (kra_peer_t *peer) +kranal_destroy_peer (kra_peer_t *peer) { CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer); LASSERT (atomic_read(&peer->rap_refcount) == 0); LASSERT (peer->rap_persistence == 0); LASSERT (!kranal_peer_active(peer)); - LASSERT (peer->rap_connecting == 0); + LASSERT (!peer->rap_connecting); LASSERT (list_empty(&peer->rap_conns)); LASSERT (list_empty(&peer->rap_tx_queue)); + LASSERT (list_empty(&peer->rap_connd_list)); PORTAL_FREE(peer, sizeof(*peer)); @@ -1387,31 +1461,6 @@ kranal_close_peer_conns_locked (kra_peer_t *peer, int why) } int -kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation) -{ - kra_conn_t *conn; - struct list_head *ctmp; - struct list_head *cnxt; - int count = 0; - - list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { - conn = list_entry(ctmp, kra_conn_t, rac_list); - - if (conn->rac_peer_incarnation == incarnation) - continue; - - CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n", - peer->rap_nid, conn->rac_peer_incarnation, incarnation); - LASSERT (conn->rac_peer_incarnation < incarnation); - - count++; - kranal_close_conn_locked(conn, -ESTALE); - } - - return count; -} - -int kranal_close_matching_conns (ptl_nid_t nid) { unsigned long flags; @@ -1570,6 +1619,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n) tx->tx_isnblk = isnblk; tx->tx_buftype = RANAL_BUF_NONE; + tx->tx_msg.ram_type = RANAL_MSG_NONE; list_add(&tx->tx_list, freelist); } @@ -1637,6 +1687,7 @@ kranal_device_init(int id, kra_device_t *dev) void kranal_device_fini(kra_device_t *dev) { + LASSERT(dev->rad_scheduler == NULL); RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag); RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); @@ -1647,7 +1698,6 @@ void kranal_api_shutdown (nal_t *nal) { int i; - int rc; unsigned long flags; if (nal->nal_refct != 0) { @@ -1791,13 +1841,14 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */ /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and - * a unique (for all time) incarnation so we can uniquely identify - * the sender. The incarnation is an incrementing counter + * a unique (for all time) connstamp so we can uniquely identify + * the sender. The connstamp is an incrementing counter * initialised with seconds + microseconds at startup time. So we * rely on NOT creating connections more frequently on average than - * 1MHz to ensure we don't use old incarnations when we reboot. */ + * 1MHz to ensure we don't use old connstamps when we reboot. */ do_gettimeofday(&tv); - kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; + kranal_data.kra_connstamp = + kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; init_MUTEX(&kranal_data.kra_nid_mutex); init_MUTEX_LOCKED(&kranal_data.kra_listener_signal); @@ -1813,6 +1864,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, spin_lock_init(&dev->rad_lock); } + kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; init_waitqueue_head(&kranal_data.kra_reaper_waitq); spin_lock_init(&kranal_data.kra_reaper_lock); diff --git a/lnet/klnds/ralnd/ralnd.h b/lnet/klnds/ralnd/ralnd.h index fe130b7..1622c40 100644 --- a/lnet/klnds/ralnd/ralnd.h +++ b/lnet/klnds/ralnd/ralnd.h @@ -59,12 +59,6 @@ #include -#if CONFIG_SMP -# define RANAL_N_SCHED num_online_cpus() /* # schedulers */ -#else -# define RANAL_N_SCHED 1 /* # schedulers */ -#endif - #define RANAL_MAXDEVS 2 /* max # devices RapidArray supports */ #define RANAL_N_CONND 4 /* # connection daemons */ @@ -72,8 +66,8 @@ #define RANAL_MIN_RECONNECT_INTERVAL 1 /* first failed connection retry (seconds)... */ #define RANAL_MAX_RECONNECT_INTERVAL 60 /* ...exponentially increasing to this */ -#define RANAL_FMA_PREFIX_LEN 232 /* size of FMA "Prefix" */ -#define RANAL_FMA_MAX_DATA_LEN ((7<<10)-256) /* Max FMA MSG is 7K including prefix */ +#define RANAL_FMA_MAX_PREFIX 232 /* max size of FMA "Prefix" */ +#define RANAL_FMA_MAX_DATA ((7<<10)-256) /* Max FMA MSG is 7K including prefix */ #define RANAL_PEER_HASH_SIZE 101 /* # peer lists */ #define RANAL_CONN_HASH_SIZE 101 /* # conn lists */ @@ -92,7 +86,7 @@ /* default vals for runtime tunables */ #define RANAL_TIMEOUT 30 /* comms timeout (seconds) */ #define RANAL_LISTENER_TIMEOUT 5 /* listener timeout (seconds) */ -#define RANAL_MAX_IMMEDIATE (2<<10) /* biggest immediate payload */ +#define RANAL_MAX_IMMEDIATE (2<<10) /* immediate payload breakpoint */ typedef struct { @@ -100,7 +94,8 @@ typedef struct int kra_listener_timeout; /* max time the listener can block */ int kra_backlog; /* listener's backlog */ int kra_port; /* listener's TCP/IP port */ - int kra_max_immediate; /* biggest immediate payload */ + int kra_max_immediate; /* immediate payload breakpoint */ + struct ctl_table_header *kra_sysctl; /* sysctl interface */ } kra_tunables_t; @@ -114,8 +109,10 @@ typedef struct int rad_idx; /* index in kra_devices */ int rad_ready; /* set by device callback */ struct list_head rad_connq; /* connections requiring attention */ + struct list_head rad_zombies; /* connections to free */ wait_queue_head_t rad_waitq; /* scheduler waits here */ spinlock_t rad_lock; /* serialise */ + void *rad_scheduler; /* scheduling thread */ } kra_device_t; typedef struct @@ -140,7 +137,8 @@ typedef struct struct list_head *kra_conns; /* conns hashed by cqid */ int kra_conn_hash_size; /* size of kra_conns */ - __u64 kra_next_incarnation; /* conn incarnation # generator */ + __u64 kra_peerstamp; /* when I started up */ + __u64 kra_connstamp; /* conn stamp generator */ int kra_next_cqid; /* cqid generator */ atomic_t kra_nconns; /* # connections extant */ @@ -173,11 +171,12 @@ typedef struct kra_connreq /* connection request/response */ { /* (sent via socket) */ __u32 racr_magic; /* I'm an ranal connreq */ __u16 racr_version; /* this is my version number */ - __u16 racr_devid; /* which device to connect on */ - __u64 racr_nid; /* my NID */ - __u64 racr_incarnation; /* my incarnation */ - __u32 racr_timeout; /* my timeout */ - RAP_RI_PARAMETERS racr_riparams; /* my endpoint info */ + __u16 racr_devid; /* sender's device ID */ + __u64 racr_nid; /* sender's NID */ + __u64 racr_peerstamp; /* sender's instance stamp */ + __u64 racr_connstamp; /* sender's connection stamp */ + __u32 racr_timeout; /* sender's timeout */ + RAP_RI_PARAMETERS racr_riparams; /* sender's endpoint info */ } kra_connreq_t; typedef struct @@ -224,7 +223,7 @@ typedef struct /* NB must fit in FMA "Prefix" * __u16 ram_version; /* this is my version number */ __u16 ram_type; /* msg type */ __u64 ram_srcnid; /* sender's NID */ - __u64 ram_incarnation; /* sender's connection incarnation */ + __u64 ram_connstamp; /* sender's connection stamp */ union { kra_immediate_msg_t immediate; kra_putreq_msg_t putreq; @@ -300,12 +299,13 @@ typedef struct kra_conn struct kra_peer *rac_peer; /* owning peer */ struct list_head rac_list; /* stash on peer's conn list */ struct list_head rac_hashlist; /* stash in connection hash table */ - struct list_head rac_schedlist; /* queue for scheduler */ + struct list_head rac_schedlist; /* schedule (on rad_connq) for attention */ struct list_head rac_fmaq; /* txs queued for FMA */ struct list_head rac_rdmaq; /* txs awaiting RDMA completion */ struct list_head rac_replyq; /* txs awaiting replies */ - __u64 rac_peer_incarnation; /* peer's unique connection stamp */ - __u64 rac_my_incarnation; /* my unique connection stamp */ + __u64 rac_peerstamp; /* peer's unique stamp */ + __u64 rac_peer_connstamp; /* peer's unique connection stamp */ + __u64 rac_my_connstamp; /* my unique connection stamp */ unsigned long rac_last_tx; /* when I last sent an FMA message */ unsigned long rac_last_rx; /* when I last received an FMA messages */ long rac_keepalive; /* keepalive interval */ @@ -316,7 +316,7 @@ typedef struct kra_conn atomic_t rac_refcount; /* # users */ unsigned int rac_close_sent; /* I've sent CLOSE */ unsigned int rac_close_recvd; /* I've received CLOSE */ - unsigned int rac_closing; /* connection being torn down */ + unsigned int rac_state; /* connection state */ unsigned int rac_scheduled; /* being attented to */ spinlock_t rac_lock; /* serialise */ kra_device_t *rac_device; /* which device */ @@ -325,6 +325,10 @@ typedef struct kra_conn kra_msg_t rac_msg; /* keepalive/CLOSE message buffer */ } kra_conn_t; +#define RANAL_CONN_ESTABLISHED 0 +#define RANAL_CONN_CLOSING 1 +#define RANAL_CONN_CLOSED 2 + typedef struct kra_peer { struct list_head rap_list; /* stash on global peer list */ @@ -358,8 +362,8 @@ extern lib_nal_t kranal_lib; extern kra_data_t kranal_data; extern kra_tunables_t kranal_tunables; -extern void __kranal_peer_decref(kra_peer_t *peer); -extern void __kranal_conn_decref(kra_conn_t *conn); +extern void kranal_destroy_peer(kra_peer_t *peer); +extern void kranal_destroy_conn(kra_conn_t *conn); static inline void kranal_peer_addref(kra_peer_t *peer) @@ -375,7 +379,7 @@ kranal_peer_decref(kra_peer_t *peer) CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid); LASSERT(atomic_read(&peer->rap_refcount) > 0); if (atomic_dec_and_test(&peer->rap_refcount)) - __kranal_peer_decref(peer); + kranal_destroy_peer(peer); } static inline struct list_head * @@ -383,7 +387,7 @@ kranal_nid2peerlist (ptl_nid_t nid) { unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size; - return (&kranal_data.kra_peers [hash]); + return (&kranal_data.kra_peers[hash]); } static inline int @@ -407,7 +411,7 @@ kranal_conn_decref(kra_conn_t *conn) CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid); LASSERT(atomic_read(&conn->rac_refcount) > 0); if (atomic_dec_and_test(&conn->rac_refcount)) - __kranal_conn_decref(conn); + kranal_destroy_conn(conn); } static inline struct list_head * @@ -457,8 +461,6 @@ kranal_page2phys (struct page *p) extern int kranal_listener_procint(ctl_table *table, int write, struct file *filp, void *buffer, size_t *lenp); -extern int kranal_close_stale_conns_locked (kra_peer_t *peer, - __u64 incarnation); extern void kranal_update_reaper_timeout(long timeout); extern void kranal_tx_done (kra_tx_t *tx, int completion); extern void kranal_unlink_peer_locked (kra_peer_t *peer); diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c index 9490b56..69a4b33 100644 --- a/lnet/klnds/ralnd/ralnd_cb.c +++ b/lnet/klnds/ralnd/ralnd_cb.c @@ -82,122 +82,6 @@ kranal_schedule_conn(kra_conn_t *conn) spin_unlock_irqrestore(&dev->rad_lock, flags); } -void -kranal_schedule_cqid (__u32 cqid) -{ - kra_conn_t *conn; - struct list_head *conns; - struct list_head *tmp; - - conns = kranal_cqid2connlist(cqid); - - read_lock(&kranal_data.kra_global_lock); - - conn = kranal_cqid2conn_locked(cqid); - - if (conn == NULL) - CWARN("no cqid %x\n", cqid); - else - kranal_schedule_conn(conn); - - read_unlock(&kranal_data.kra_global_lock); -} - -void -kranal_schedule_dev(kra_device_t *dev) -{ - kra_conn_t *conn; - struct list_head *conns; - struct list_head *tmp; - int i; - - /* Don't do this in IRQ context (servers may have 1000s of clients) */ - LASSERT (!in_interrupt()); - - CWARN("Scheduling ALL conns on device %d\n", dev->rad_id); - - for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { - - /* Drop the lock on each hash bucket to ensure we don't - * block anyone for too long at IRQ priority on another CPU */ - - read_lock(&kranal_data.kra_global_lock); - - conns = &kranal_data.kra_conns[i]; - - list_for_each (tmp, conns) { - conn = list_entry(tmp, kra_conn_t, rac_hashlist); - - if (conn->rac_device == dev) - kranal_schedule_conn(conn); - } - read_unlock(&kranal_data.kra_global_lock); - } -} - -void -kranal_tx_done (kra_tx_t *tx, int completion) -{ - ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL; - kra_device_t *dev; - unsigned long flags; - int i; - RAP_RETURN rrc; - - LASSERT (!in_interrupt()); - - switch (tx->tx_buftype) { - default: - LBUG(); - - case RANAL_BUF_NONE: - case RANAL_BUF_IMMEDIATE: - case RANAL_BUF_PHYS_UNMAPPED: - case RANAL_BUF_VIRT_UNMAPPED: - break; - - case RANAL_BUF_PHYS_MAPPED: - LASSERT (tx->tx_conn != NULL); - dev = tx->tx_conn->rac_device; - rrc = RapkDeregisterMemory(dev->rad_handle, NULL, - dev->rad_ptag, &tx->tx_map_key); - LASSERT (rrc == RAP_SUCCESS); - break; - - case RANAL_BUF_VIRT_MAPPED: - LASSERT (tx->tx_conn != NULL); - dev = tx->tx_conn->rac_device; - rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer, - dev->rad_ptag, &tx->tx_map_key); - LASSERT (rrc == RAP_SUCCESS); - break; - } - - for (i = 0; i < 2; i++) { - /* tx may have up to 2 libmsgs to finalise */ - if (tx->tx_libmsg[i] == NULL) - continue; - - lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc); - tx->tx_libmsg[i] = NULL; - } - - tx->tx_buftype = RANAL_BUF_NONE; - tx->tx_msg.ram_type = RANAL_MSG_NONE; - tx->tx_conn = NULL; - - spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); - - if (tx->tx_isnblk) { - list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs); - } else { - list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs); - wake_up(&kranal_data.kra_idle_tx_waitq); - } - - spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); -} - kra_tx_t * kranal_get_idle_tx (int may_block) { @@ -259,7 +143,7 @@ kranal_init_msg(kra_msg_t *msg, int type) msg->ram_version = RANAL_MSG_VERSION; msg->ram_type = type; msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid; - /* ram_incarnation gets set when FMA is sent */ + /* ram_connstamp gets set when FMA is sent */ } kra_tx_t * @@ -279,6 +163,10 @@ kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, int offset, int nob) { + /* For now this is almost identical to kranal_setup_virt_buffer, but we + * could "flatten" the payload into a single contiguous buffer ready + * for sending direct over an FMA if we ever needed to. */ + LASSERT (nob > 0); LASSERT (niov > 0); LASSERT (tx->tx_buftype == RANAL_BUF_NONE); @@ -365,7 +253,6 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, if (kiov->kiov_offset != 0 || ((resid > PAGE_SIZE) && kiov->kiov_len < PAGE_SIZE)) { - int i; /* Can't have gaps */ CERROR("Can't make payload contiguous in I/O VM:" "page %d, offset %d, len %d \n", @@ -410,27 +297,108 @@ kranal_map_buffer (kra_tx_t *tx) kra_device_t *dev = conn->rac_device; RAP_RETURN rrc; + LASSERT (current == dev->rad_scheduler); + switch (tx->tx_buftype) { default: + LBUG(); + + case RANAL_BUF_NONE: + case RANAL_BUF_IMMEDIATE: + case RANAL_BUF_PHYS_MAPPED: + case RANAL_BUF_VIRT_MAPPED: + break; case RANAL_BUF_PHYS_UNMAPPED: - rrc = RapkRegisterPhys(conn->rac_device->rad_handle, + rrc = RapkRegisterPhys(dev->rad_handle, tx->tx_phys, tx->tx_phys_npages, - conn->rac_device->rad_ptag, - &tx->tx_map_key); + dev->rad_ptag, &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); tx->tx_buftype = RANAL_BUF_PHYS_MAPPED; - return; + break; case RANAL_BUF_VIRT_UNMAPPED: - rrc = RapkRegisterMemory(conn->rac_device->rad_handle, + rrc = RapkRegisterMemory(dev->rad_handle, tx->tx_buffer, tx->tx_nob, - conn->rac_device->rad_ptag, - &tx->tx_map_key); + dev->rad_ptag, &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); tx->tx_buftype = RANAL_BUF_VIRT_MAPPED; - return; + break; + } +} + +void +kranal_unmap_buffer (kra_tx_t *tx) +{ + kra_device_t *dev; + RAP_RETURN rrc; + + switch (tx->tx_buftype) { + default: + LBUG(); + + case RANAL_BUF_NONE: + case RANAL_BUF_IMMEDIATE: + case RANAL_BUF_PHYS_UNMAPPED: + case RANAL_BUF_VIRT_UNMAPPED: + break; + + case RANAL_BUF_PHYS_MAPPED: + LASSERT (tx->tx_conn != NULL); + dev = tx->tx_conn->rac_device; + LASSERT (current == dev->rad_scheduler); + rrc = RapkDeregisterMemory(dev->rad_handle, NULL, + dev->rad_ptag, &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED; + break; + + case RANAL_BUF_VIRT_MAPPED: + LASSERT (tx->tx_conn != NULL); + dev = tx->tx_conn->rac_device; + LASSERT (current == dev->rad_scheduler); + rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer, + dev->rad_ptag, &tx->tx_map_key); + LASSERT (rrc == RAP_SUCCESS); + tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED; + break; + } +} + +void +kranal_tx_done (kra_tx_t *tx, int completion) +{ + ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL; + unsigned long flags; + int i; + + LASSERT (!in_interrupt()); + + kranal_unmap_buffer(tx); + + for (i = 0; i < 2; i++) { + /* tx may have up to 2 libmsgs to finalise */ + if (tx->tx_libmsg[i] == NULL) + continue; + + lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc); + tx->tx_libmsg[i] = NULL; + } + + tx->tx_buftype = RANAL_BUF_NONE; + tx->tx_msg.ram_type = RANAL_MSG_NONE; + tx->tx_conn = NULL; + + spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); + + if (tx->tx_isnblk) { + list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs); + } else { + list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs); + wake_up(&kranal_data.kra_idle_tx_waitq); } + + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); } kra_conn_t * @@ -513,6 +481,8 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) LASSERT (peer->rap_persistence > 0); if (!peer->rap_connecting) { + LASSERT (list_empty(&peer->rap_tx_queue)); + now = CURRENT_TIME; if (now < peer->rap_reconnect_time) { write_unlock_irqrestore(g_lock, flags); @@ -538,35 +508,40 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) write_unlock_irqrestore(g_lock, flags); } -static void +void kranal_rdma(kra_tx_t *tx, int type, - kra_rdma_desc_t *rard, int nob, __u64 cookie) + kra_rdma_desc_t *sink, int nob, __u64 cookie) { kra_conn_t *conn = tx->tx_conn; RAP_RETURN rrc; unsigned long flags; - /* prep final completion message */ - kranal_init_msg(&tx->tx_msg, type); - tx->tx_msg.ram_u.completion.racm_cookie = cookie; - - LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || - tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); - LASSERT (nob <= rard->rard_nob); + LASSERT (kranal_tx_mapped(tx)); + LASSERT (nob <= sink->rard_nob); + LASSERT (nob <= tx->tx_nob); + /* No actual race with scheduler sending CLOSE (I'm she!) */ + LASSERT (current == conn->rac_device->rad_scheduler); + memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_rdma_desc.SrcKey = tx->tx_map_key; - tx->tx_rdma_desc.DstPtr = rard->rard_addr; - tx->tx_rdma_desc.DstKey = rard->rard_key; + tx->tx_rdma_desc.DstPtr = sink->rard_addr; + tx->tx_rdma_desc.DstKey = sink->rard_key; tx->tx_rdma_desc.Length = nob; tx->tx_rdma_desc.AppPtr = tx; + /* prep final completion message */ + kranal_init_msg(&tx->tx_msg, type); + tx->tx_msg.ram_u.completion.racm_cookie = cookie; + if (nob == 0) { /* Immediate completion */ kranal_post_fma(conn, tx); return; } - + + LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */ + rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc); LASSERT (rrc == RAP_SUCCESS); @@ -639,7 +614,7 @@ kranal_do_send (lib_nal_t *nal, LASSERT (conn->rac_rxmsg != NULL); if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) { - if (nob > RANAL_MAX_IMMEDIATE) { + if (nob > RANAL_FMA_MAX_DATA) { CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n", nob, nid); return PTL_FAIL; @@ -676,11 +651,11 @@ kranal_do_send (lib_nal_t *nal, case PTL_MSG_GET: if (kiov == NULL && /* not paged */ - nob <= RANAL_MAX_IMMEDIATE && /* small enough */ + nob <= RANAL_FMA_MAX_DATA && /* small enough */ nob <= kranal_tunables.kra_max_immediate) break; /* send IMMEDIATE */ - tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ); + tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ); if (tx == NULL) return PTL_NO_SPACE; @@ -846,6 +821,7 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg, return PTL_FAIL; } + tx->tx_conn = conn; kranal_map_buffer(tx); tx->tx_msg.ram_u.putack.rapam_src_cookie = @@ -903,7 +879,7 @@ kranal_thread_fini (void) } int -kranal_check_conn (kra_conn_t *conn) +kranal_check_conn_timeouts (kra_conn_t *conn) { kra_tx_t *tx; struct list_head *ttmp; @@ -911,14 +887,16 @@ kranal_check_conn (kra_conn_t *conn) long timeout; unsigned long now = jiffies; - if (!conn->rac_closing && + LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED || + conn->rac_state == RANAL_CONN_CLOSING); + + if (!conn->rac_close_sent && time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) { /* not sent in a while; schedule conn so scheduler sends a keepalive */ kranal_schedule_conn(conn); } - /* wait twice as long for CLOSE to be sure peer is dead */ - timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ; + timeout = conn->rac_timeout * HZ; if (!conn->rac_close_recvd && time_after_eq(now, conn->rac_last_rx + timeout)) { @@ -927,7 +905,7 @@ kranal_check_conn (kra_conn_t *conn) return -ETIMEDOUT; } - if (conn->rac_closing) + if (conn->rac_state != RANAL_CONN_ESTABLISHED) return 0; /* Check the conn's queues are moving. These are "belt+braces" checks, @@ -974,7 +952,7 @@ kranal_check_conn (kra_conn_t *conn) } void -kranal_check_conns (int idx, unsigned long *min_timeoutp) +kranal_reaper_check (int idx, unsigned long *min_timeoutp) { struct list_head *conns = &kranal_data.kra_conns[idx]; struct list_head *ctmp; @@ -995,23 +973,31 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp) if (conn->rac_keepalive < *min_timeoutp ) *min_timeoutp = conn->rac_keepalive; - rc = kranal_check_conn(conn); + rc = kranal_check_conn_timeouts(conn); if (rc == 0) continue; kranal_conn_addref(conn); read_unlock(&kranal_data.kra_global_lock); - CERROR("Check on conn to "LPX64"failed: %d\n", - conn->rac_peer->rap_nid, rc); + CERROR("Conn to "LPX64", cqid %d timed out\n", + conn->rac_peer->rap_nid, conn->rac_cqid); write_lock_irqsave(&kranal_data.kra_global_lock, flags); - if (!conn->rac_closing) + switch (conn->rac_state) { + default: + LBUG(); + + case RANAL_CONN_ESTABLISHED: kranal_close_conn_locked(conn, -ETIMEDOUT); - else - kranal_terminate_conn_locked(conn); + break; + case RANAL_CONN_CLOSING: + kranal_terminate_conn_locked(conn); + break; + } + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); @@ -1030,7 +1016,6 @@ kranal_connd (void *arg) wait_queue_t wait; unsigned long flags; kra_peer_t *peer; - int i; snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg); kportal_daemonize(name); @@ -1096,8 +1081,6 @@ kranal_reaper (void *arg) { wait_queue_t wait; unsigned long flags; - kra_conn_t *conn; - kra_peer_t *peer; long timeout; int i; int conn_entries = kranal_data.kra_conn_hash_size; @@ -1113,7 +1096,6 @@ kranal_reaper (void *arg) init_waitqueue_entry(&wait, current); spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - kranal_data.kra_new_min_timeout = 1; while (!kranal_data.kra_shutdown) { @@ -1165,8 +1147,8 @@ kranal_reaper (void *arg) chunk = 1; for (i = 0; i < chunk; i++) { - kranal_check_conns(conn_index, - &next_min_timeout); + kranal_reaper_check(conn_index, + &next_min_timeout); conn_index = (conn_index + 1) % conn_entries; } @@ -1212,41 +1194,113 @@ kranal_reaper (void *arg) } void -kranal_process_rdmaq (__u32 cqid) +kranal_check_rdma_cq (kra_device_t *dev) { kra_conn_t *conn; kra_tx_t *tx; RAP_RETURN rrc; unsigned long flags; RAP_RDMA_DESCRIPTOR *desc; - - read_lock(&kranal_data.kra_global_lock); + __u32 cqid; + __u32 event_type; + + for (;;) { + rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type); + if (rrc == RAP_NOT_DONE) + return; - conn = kranal_cqid2conn_locked(cqid); - LASSERT (conn != NULL); + LASSERT (rrc == RAP_SUCCESS); + LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); - rrc = RapkRdmaDone(conn->rac_rihandle, &desc); - LASSERT (rrc == RAP_SUCCESS); + read_lock(&kranal_data.kra_global_lock); - spin_lock_irqsave(&conn->rac_lock, flags); + conn = kranal_cqid2conn_locked(cqid); + if (conn == NULL) { + /* Conn was destroyed? */ + CWARN("RDMA CQID lookup %d failed\n", cqid); + read_unlock(&kranal_data.kra_global_lock); + continue; + } - LASSERT (!list_empty(&conn->rac_rdmaq)); - tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list); - list_del(&tx->tx_list); + rrc = RapkRdmaDone(conn->rac_rihandle, &desc); + LASSERT (rrc == RAP_SUCCESS); - LASSERT(desc->AppPtr == (void *)tx); - LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE || - tx->tx_msg.ram_type == RANAL_MSG_GET_DONE); + spin_lock_irqsave(&conn->rac_lock, flags); - list_add_tail(&tx->tx_list, &conn->rac_fmaq); - tx->tx_qtime = jiffies; + LASSERT (!list_empty(&conn->rac_rdmaq)); + tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list); + list_del(&tx->tx_list); + + LASSERT(desc->AppPtr == (void *)tx); + LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE || + tx->tx_msg.ram_type == RANAL_MSG_GET_DONE); + + list_add_tail(&tx->tx_list, &conn->rac_fmaq); + tx->tx_qtime = jiffies; - spin_unlock_irqrestore(&conn->rac_lock, flags); + spin_unlock_irqrestore(&conn->rac_lock, flags); - /* Get conn's fmaq processed, now I've just put something there */ - kranal_schedule_conn(conn); + /* Get conn's fmaq processed, now I've just put something + * there */ + kranal_schedule_conn(conn); - read_unlock(&kranal_data.kra_global_lock); + read_unlock(&kranal_data.kra_global_lock); + } +} + +void +kranal_check_fma_cq (kra_device_t *dev) +{ + kra_conn_t *conn; + RAP_RETURN rrc; + __u32 cqid; + __u32 event_type; + struct list_head *conns; + struct list_head *tmp; + int i; + + for (;;) { + rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type); + if (rrc != RAP_NOT_DONE) + return; + + LASSERT (rrc == RAP_SUCCESS); + + if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) { + + read_lock(&kranal_data.kra_global_lock); + + conn = kranal_cqid2conn_locked(cqid); + if (conn == NULL) + CWARN("FMA CQID lookup %d failed\n", cqid); + else + kranal_schedule_conn(conn); + + read_unlock(&kranal_data.kra_global_lock); + continue; + } + + /* FMA CQ has overflowed: check ALL conns */ + CWARN("Scheduling ALL conns on device %d\n", dev->rad_id); + + for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { + + read_lock(&kranal_data.kra_global_lock); + + conns = &kranal_data.kra_conns[i]; + + list_for_each (tmp, conns) { + conn = list_entry(tmp, kra_conn_t, + rac_hashlist); + + if (conn->rac_device == dev) + kranal_schedule_conn(conn); + } + + /* don't block write lockers for too long... */ + read_unlock(&kranal_data.kra_global_lock); + } + } } int @@ -1256,12 +1310,12 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0; RAP_RETURN rrc; - LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN); + LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX); LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ? - immediatenob <= RANAL_FMA_MAX_DATA_LEN : + immediatenob <= RANAL_FMA_MAX_DATA : immediatenob == 0); - msg->ram_incarnation = conn->rac_my_incarnation; + msg->ram_connstamp = conn->rac_my_connstamp; msg->ram_seq = conn->rac_tx_seq; if (sync) @@ -1287,7 +1341,7 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, } } -int +void kranal_process_fmaq (kra_conn_t *conn) { unsigned long flags; @@ -1296,27 +1350,47 @@ kranal_process_fmaq (kra_conn_t *conn) int rc; int expect_reply; - /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is - * out of credits when I try to send right now... */ - - if (conn->rac_closing) { + /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now. + * However I will be rescheduled some by a rad_fma_cq event when + * I eventually get some. + * NB 2. Sampling rac_state here, races with setting it elsewhere + * kranal_close_conn_locked. But it doesn't matter if I try to + * send a "real" message just as I start closing because I'll get + * scheduled to send the close anyway. */ + if (conn->rac_state != RANAL_CONN_ESTABLISHED) { if (!list_empty(&conn->rac_rdmaq)) { - /* Can't send CLOSE yet; I'm still waiting for RDMAs I - * posted to finish */ + /* RDMAs in progress */ LASSERT (!conn->rac_close_sent); - kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); - kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); - return 0; + + if (time_after_eq(jiffies, + conn->rac_last_tx + + conn->rac_keepalive)) { + kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); + kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + } + return; } - - if (conn->rac_close_sent) - return 0; + if (conn->rac_close_sent) + return; + kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE); rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); - conn->rac_close_sent = (rc == 0); - return 0; + if (rc != 0) + return; + + conn->rac_close_sent = 1; + if (!conn->rac_close_recvd) + return; + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (conn->rac_state == RANAL_CONN_CLOSING) + kranal_terminate_conn_locked(conn); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + return; } spin_lock_irqsave(&conn->rac_lock, flags); @@ -1330,7 +1404,7 @@ kranal_process_fmaq (kra_conn_t *conn) kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); } - return 0; + return; } tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); @@ -1379,12 +1453,12 @@ kranal_process_fmaq (kra_conn_t *conn) } if (rc == -EAGAIN) { - /* replace at the head of the list for later */ + /* I need credits to send this. Replace tx at the head of the + * fmaq and I'll get rescheduled when credits appear */ spin_lock_irqsave(&conn->rac_lock, flags); list_add(&tx->tx_list, &conn->rac_fmaq); spin_unlock_irqrestore(&conn->rac_lock, flags); - - return 0; + return; } LASSERT (rc == 0); @@ -1398,7 +1472,8 @@ kranal_process_fmaq (kra_conn_t *conn) spin_unlock_irqrestore(&conn->rac_lock, flags); } - return more_to_do; + if (more_to_do) + kranal_schedule_conn(conn); } static inline void @@ -1415,7 +1490,6 @@ kranal_swab_rdma_desc (kra_rdma_desc_t *d) kra_tx_t * kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie) { - unsigned long flags; struct list_head *ttmp; kra_tx_t *tx; @@ -1438,12 +1512,11 @@ kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie) return NULL; } -int -kranal_process_receives(kra_conn_t *conn) +void +kranal_check_fma_rx (kra_conn_t *conn) { unsigned long flags; __u32 seq; - __u32 nob; kra_tx_t *tx; kra_msg_t *msg; void *prefix; @@ -1451,7 +1524,7 @@ kranal_process_receives(kra_conn_t *conn) kra_peer_t *peer = conn->rac_peer; if (rrc == RAP_NOT_DONE) - return 0; + return; LASSERT (rrc == RAP_SUCCESS); conn->rac_last_rx = jiffies; @@ -1469,7 +1542,7 @@ kranal_process_receives(kra_conn_t *conn) __swab16s(&msg->ram_version); __swab16s(&msg->ram_type); __swab64s(&msg->ram_srcnid); - __swab64s(&msg->ram_incarnation); + __swab64s(&msg->ram_connstamp); __swab32s(&msg->ram_seq); /* NB message type checked below; NOT here... */ @@ -1499,10 +1572,10 @@ kranal_process_receives(kra_conn_t *conn) goto out; } - if (msg->ram_incarnation != conn->rac_peer_incarnation) { - CERROR("Unexpected incarnation "LPX64"("LPX64 + if (msg->ram_connstamp != conn->rac_peer_connstamp) { + CERROR("Unexpected connstamp "LPX64"("LPX64 " expected) from "LPX64"\n", - msg->ram_incarnation, conn->rac_peer_incarnation, + msg->ram_connstamp, conn->rac_peer_connstamp, peer->rap_nid); goto out; } @@ -1514,17 +1587,23 @@ kranal_process_receives(kra_conn_t *conn) } if ((msg->ram_type & RANAL_MSG_FENCE) != 0) { - /* This message signals RDMA completion: wait now... */ + /* This message signals RDMA completion... */ rrc = RapkFmaSyncWait(conn->rac_rihandle); LASSERT (rrc == RAP_SUCCESS); } - + + if (conn->rac_close_recvd) { + CERROR("Unexpected message %d after CLOSE from "LPX64"\n", + msg->ram_type, conn->rac_peer->rap_nid); + goto out; + } + if (msg->ram_type == RANAL_MSG_CLOSE) { conn->rac_close_recvd = 1; write_lock_irqsave(&kranal_data.kra_global_lock, flags); - if (!conn->rac_closing) - kranal_close_conn_locked(conn, -ETIMEDOUT); + if (conn->rac_state == RANAL_CONN_ESTABLISHED) + kranal_close_conn_locked(conn, 0); else if (conn->rac_close_sent) kranal_terminate_conn_locked(conn); @@ -1532,7 +1611,7 @@ kranal_process_receives(kra_conn_t *conn) goto out; } - if (conn->rac_closing) + if (conn->rac_state != RANAL_CONN_ESTABLISHED) goto out; conn->rac_rxmsg = msg; /* stash message for portals callbacks */ @@ -1636,7 +1715,32 @@ kranal_process_receives(kra_conn_t *conn) if (conn->rac_rxmsg != NULL) kranal_consume_rxmsg(conn, NULL, 0); - return 1; + /* check again later */ + kranal_schedule_conn(conn); +} + +void +kranal_complete_closed_conn (kra_conn_t *conn) +{ + kra_tx_t *tx; + + LASSERT (conn->rac_state == RANAL_CONN_CLOSED); + + while (!list_empty(&conn->rac_fmaq)) { + tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -ECONNABORTED); + } + + LASSERT (list_empty(&conn->rac_rdmaq)); + + while (!list_empty(&conn->rac_replyq)) { + tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); + + list_del(&tx->tx_list); + kranal_tx_done(tx, -ECONNABORTED); + } } int @@ -1647,19 +1751,13 @@ kranal_scheduler (void *arg) char name[16]; kra_conn_t *conn; unsigned long flags; - RAP_RETURN rrc; - int rc; - int resched; - int i; - __u32 cqid; - __u32 event_type; - int did_something; int busy_loops = 0; snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx); kportal_daemonize(name); kportal_blockallsigs(); + dev->rad_scheduler = current; init_waitqueue_entry(&wait, current); spin_lock_irqsave(&dev->rad_lock, flags); @@ -1676,64 +1774,37 @@ kranal_scheduler (void *arg) spin_lock_irqsave(&dev->rad_lock, flags); } - did_something = 0; - if (dev->rad_ready) { + /* Device callback fired since I last checked it */ dev->rad_ready = 0; spin_unlock_irqrestore(&dev->rad_lock, flags); - rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type); + kranal_check_rdma_cq(dev); + kranal_check_fma_cq(dev); - LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE); - LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); - - if (rrc == RAP_SUCCESS) { - kranal_process_rdmaq(cqid); - did_something = 1; - } - - rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type); - LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE); - - if (rrc == RAP_SUCCESS) { - if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0) - kranal_schedule_dev(dev); - else - kranal_schedule_cqid(cqid); - did_something = 1; - } - spin_lock_irqsave(&dev->rad_lock, flags); - - /* If there were no completions to handle, I leave - * rad_ready clear. NB I cleared it BEFORE I checked - * the completion queues since I'm racing with the - * device callback. */ - - if (did_something) - dev->rad_ready = 1; } if (!list_empty(&dev->rad_connq)) { + /* Connection needs attention */ conn = list_entry(dev->rad_connq.next, kra_conn_t, rac_schedlist); - list_del(&conn->rac_schedlist); + list_del_init(&conn->rac_schedlist); + LASSERT (conn->rac_scheduled); + conn->rac_scheduled = 0; spin_unlock_irqrestore(&dev->rad_lock, flags); - LASSERT (conn->rac_scheduled); + kranal_check_fma_rx(conn); + kranal_process_fmaq(conn); - resched = kranal_process_fmaq(conn); - resched |= kranal_process_receives(conn); - did_something = 1; + if (conn->rac_state == RANAL_CONN_CLOSED) + kranal_complete_closed_conn(conn); + kranal_conn_decref(conn); + spin_lock_irqsave(&dev->rad_lock, flags); - if (resched) - list_add_tail(&conn->rac_schedlist, - &dev->rad_connq); - } - - if (did_something) continue; + } add_wait_queue(&dev->rad_waitq, &wait); set_current_state(TASK_INTERRUPTIBLE); @@ -1751,6 +1822,7 @@ kranal_scheduler (void *arg) spin_unlock_irqrestore(&dev->rad_lock, flags); + dev->rad_scheduler = NULL; kranal_thread_fini(); return 0; } -- 1.8.3.1