route->ksnr_deleted = 0;
route->ksnr_conn_count = 0;
route->ksnr_share_count = 0;
- route->ksnr_proto = &ksocknal_protocol_v2x;
return (route);
}
peer->ksnp_closing = 0;
peer->ksnp_accepting = 0;
peer->ksnp_zc_next_cookie = 1;
+ peer->ksnp_proto = NULL;
CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
write_lock_bh (&ksocknal_data.ksnd_global_lock);
+ /* always called with a ref on ni, so shutdown can't have started */
+ LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
+
peer2 = ksocknal_find_peer_locked (ni, id);
if (peer2 != NULL) {
ksocknal_peer_decref(peer);
read_lock (&ksocknal_data.ksnd_global_lock);
nip = net->ksnn_ninterfaces;
- LASSERT (nip < LNET_MAX_INTERFACES);
+ LASSERT (nip <= LNET_MAX_INTERFACES);
/* Only offer interfaces for additional connections if I have
* more than one. */
}
int
+ksocknal_connecting (ksock_peer_t *peer, __u32 ipaddr)
+{
+ ksock_route_t *route;
+
+ list_for_each_entry (route, &peer->ksnp_routes, ksnr_list) {
+
+ if (route->ksnr_ipaddr == ipaddr)
+ return route->ksnr_connecting;
+ }
+ return 0;
+}
+
+int
ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
cfs_socket_t *sock, int type)
{
active = (route != NULL);
LASSERT (active == (type != SOCKLND_CONN_NONE));
- LASSERT (route == NULL || route->ksnr_proto != NULL);
irq = ksocknal_lib_sock_irq (sock);
atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
conn->ksnc_zc_capable = ksocknal_lib_zc_capable(sock);
-
conn->ksnc_rx_ready = 0;
conn->ksnc_rx_scheduled = 0;
* eagerly */
if (active) {
- LASSERT(ni == route->ksnr_peer->ksnp_ni);
+ peer = route->ksnr_peer;
+ LASSERT(ni == peer->ksnp_ni);
/* Active connection sends HELLO eagerly */
hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
- peerid = route->ksnr_peer->ksnp_id;
- conn->ksnc_proto = route->ksnr_proto;
+ peerid = peer->ksnp_id;
+
+ write_lock_bh(global_lock);
+ conn->ksnc_proto = peer->ksnp_proto;
+ write_unlock_bh(global_lock);
+
+ if (conn->ksnc_proto == NULL) {
+ conn->ksnc_proto = &ksocknal_protocol_v2x;
+#if SOCKNAL_VERSION_DEBUG
+ if (*ksocknal_tunables.ksnd_protocol != 2)
+ conn->ksnc_proto = &ksocknal_protocol_v1x;
+#endif
+ }
rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
if (rc != 0)
}
rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
- if (rc < 0) {
- if (rc == -EALREADY) {
- /* only active connection loses conn race */
- LASSERT (active);
-
- CDEBUG(D_NET, "Lost connection race with %s\n",
- libcfs_id2str(peerid));
- /* Not an actual failure: return +ve RC so active
- * connector can back off */
- rc = EALREADY;
- }
+ if (rc < 0)
goto failed_1;
- }
-
- if (active && route->ksnr_proto != conn->ksnc_proto) {
- /* Active connecting, and different protocol is returned */
- CDEBUG(D_NET, "Connecting by %d.x protocol is rejected,"
- " compatible version %d.x found.\n",
- route->ksnr_proto->pro_version,
- conn->ksnc_proto->pro_version);
- /* Not an actual failure: return +ve RC so active
- * connector can back off */
- rc = EPROTO;
- /* Retry with peer's protocol later */
- route->ksnr_proto = conn->ksnc_proto;
-
- goto failed_1;
- }
-
+ LASSERT (rc == 0 || active);
+ LASSERT (conn->ksnc_proto != NULL);
LASSERT (peerid.nid != LNET_NID_ANY);
if (active) {
- peer = route->ksnr_peer;
ksocknal_peer_addref(peer);
-
- /* additional routes after interface exchange? */
- ksocknal_create_routes(peer, conn->ksnc_port,
- hello->kshm_ips, hello->kshm_nips);
-
- /* setup the socket AFTER I've received hello (it disables
- * SO_LINGER). I might call back to the acceptor who may want
- * to send a protocol version response and then close the
- * socket; this ensures the socket only tears down after the
- * response has been sent. */
- rc = ksocknal_lib_setup_sock(sock);
-
write_lock_bh (global_lock);
-
- if (rc != 0)
- goto failed_2;
} else {
rc = ksocknal_create_peer(&peer, ni, peerid);
if (rc != 0)
write_lock_bh (global_lock);
+ /* called with a ref on ni, so shutdown can't have started */
+ LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
+
peer2 = ksocknal_find_peer_locked(ni, peerid);
if (peer2 == NULL) {
/* NB this puts an "empty" peer in the peer
/* Am I already connecting to this guy? Resolve in
* favour of higher NID... */
- rc = 0;
- if (peerid.nid < ni->ni_nid) {
- list_for_each(tmp, &peer->ksnp_routes) {
- route = list_entry(tmp, ksock_route_t,
- ksnr_list);
-
- if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
- continue;
-
- if (route->ksnr_connecting) {
- rc = EALREADY; /* not a failure */
- warn = "connection race";
- }
-
- break;
- }
- }
- route = NULL;
-
- write_unlock_bh (global_lock);
-
- if (rc != 0) {
- /* set CONN_NONE makes returned HELLO acknowledge I
- * lost a connection race */
- conn->ksnc_type = SOCKLND_CONN_NONE;
- hello->kshm_nips = 0;
- ksocknal_send_hello(ni, conn, peerid.nid, hello);
- } else {
- hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
- hello->kshm_nips);
- rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
-
- /* Setup the socket (it disables SO_LINGER). I don't
- * do it if I'm sending a negative response to ensure
- * the response isn't discarded when I close the socket
- * immediately after sending it. */
- if (rc == 0)
- rc = ksocknal_lib_setup_sock(sock);
- }
-
- write_lock_bh (global_lock);
- peer->ksnp_accepting--;
-
- if (rc != 0)
+ if (peerid.nid < ni->ni_nid &&
+ ksocknal_connecting(peer, conn->ksnc_ipaddr)) {
+ rc = EALREADY;
+ warn = "connection race resolution";
goto failed_2;
+ }
}
if (peer->ksnp_closing ||
goto failed_2;
}
+ if (peer->ksnp_proto == NULL) {
+ /* Never connected before.
+ * NB recv_hello may have returned EPROTO to signal my peer
+ * wants a different protocol than the one I asked for.
+ */
+ LASSERT (list_empty(&peer->ksnp_conns));
+
+ peer->ksnp_proto = conn->ksnc_proto;
+ peer->ksnp_incarnation = incarnation;
+ }
+
+ if (peer->ksnp_proto != conn->ksnc_proto ||
+ peer->ksnp_incarnation != incarnation) {
+ /* Peer rebooted or I've got the wrong protocol version */
+ ksocknal_close_peer_conns_locked(peer, 0, 0);
+
+ peer->ksnp_proto = NULL;
+ rc = ESTALE;
+ warn = peer->ksnp_incarnation != incarnation ?
+ "peer rebooted" :
+ "wrong proto version";
+ goto failed_2;
+ }
+
+ switch (rc) {
+ default:
+ LBUG();
+ case 0:
+ break;
+ case EALREADY:
+ warn = "lost conn race";
+ goto failed_2;
+ case EPROTO:
+ warn = "retry with different protocol version";
+ goto failed_2;
+ }
+
/* Refuse to duplicate an existing connection, unless this is a
* loopback connection */
if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
- conn2->ksnc_type != conn->ksnc_type ||
- conn2->ksnc_incarnation != incarnation)
+ conn2->ksnc_type != conn->ksnc_type)
continue;
- rc = 0; /* more of a NOOP than a failure */
+ /* Reply on a passive connection attempt so the peer
+ * realises we're connected. */
+ LASSERT (rc == 0);
+ if (!active)
+ rc = EALREADY;
+
warn = "duplicate";
goto failed_2;
}
}
conn->ksnc_peer = peer; /* conn takes my ref on peer */
- conn->ksnc_incarnation = incarnation;
peer->ksnp_last_alive = cfs_time_current();
peer->ksnp_error = 0;
ksocknal_new_packet(conn, 0);
- /* NB my callbacks block while I hold ksnd_global_lock */
- ksocknal_lib_set_callback(sock, conn);
-
/* Take all the packets blocking for a connection.
* NB, it might be nicer to share these blocked packets among any
* other connections that are becoming established. */
ksocknal_queue_tx_locked (tx, conn);
}
- rc = ksocknal_close_stale_conns_locked(peer, incarnation);
write_unlock_bh (global_lock);
- if (rc != 0)
- CDEBUG(D_NET, "Closed %d stale conns to %s ip %d.%d.%d.%d\n",
- rc, libcfs_id2str(conn->ksnc_peer->ksnp_id),
- HIPQUAD(conn->ksnc_ipaddr));
+ /* We've now got a new connection. Any errors from here on are just
+ * like "normal" comms errors and we close the connection normally.
+ * NB (a) we still have to send the reply HELLO for passive
+ * connections,
+ * (b) normal I/O on the conn is blocked until I setup and call the
+ * socket callbacks.
+ */
ksocknal_lib_bind_irq (irq);
- /* Call the callbacks right now to get things going. */
- if (ksocknal_connsock_addref(conn) == 0) {
- ksocknal_read_callback(conn);
- ksocknal_write_callback(conn);
- ksocknal_connsock_decref(conn);
- }
-
- CDEBUG(D_NET, "New conn %s %u.%u.%u.%u -> %u.%u.%u.%u/%d"
+ CDEBUG(D_NET, "New conn %s p %d.x %u.%u.%u.%u -> %u.%u.%u.%u/%d"
" incarnation:"LPD64" sched[%d]/%d\n",
- libcfs_id2str(peerid), HIPQUAD(conn->ksnc_myipaddr),
- HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, incarnation,
+ libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
+ HIPQUAD(conn->ksnc_myipaddr), HIPQUAD(conn->ksnc_ipaddr),
+ conn->ksnc_port, incarnation,
(int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
+ if (active) {
+ /* additional routes after interface exchange? */
+ ksocknal_create_routes(peer, conn->ksnc_port,
+ hello->kshm_ips, hello->kshm_nips);
+ } else {
+ hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
+ hello->kshm_nips);
+ rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
+ }
+
LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
kshm_ips[LNET_MAX_INTERFACES]));
+ /* setup the socket AFTER I've received hello (it disables
+ * SO_LINGER). I might call back to the acceptor who may want
+ * to send a protocol version response and then close the
+ * socket; this ensures the socket only tears down after the
+ * response has been sent. */
+ if (rc == 0)
+ rc = ksocknal_lib_setup_sock(sock);
+
+ write_lock_bh(global_lock);
+
+ /* NB my callbacks block while I hold ksnd_global_lock */
+ ksocknal_lib_set_callback(sock, conn);
+
+ if (!active)
+ peer->ksnp_accepting--;
+
+ write_unlock_bh(global_lock);
+
+ if (rc != 0) {
+ write_lock_bh(global_lock);
+ ksocknal_close_conn_locked(conn, rc);
+ write_unlock_bh(global_lock);
+ } else if (ksocknal_connsock_addref(conn) == 0) {
+ /* Allow I/O to proceed. */
+ ksocknal_read_callback(conn);
+ ksocknal_write_callback(conn);
+ ksocknal_connsock_decref(conn);
+ }
+
ksocknal_conn_decref(conn);
- return (0);
+ return rc;
failed_2:
if (!peer->ksnp_closing &&
libcfs_id2str(peerid), conn->ksnc_type, warn);
}
+ if (!active) {
+ if (rc > 0) {
+ /* Request retry by replying with CONN_NONE
+ * ksnc_proto has been set already */
+ conn->ksnc_type = SOCKLND_CONN_NONE;
+ hello->kshm_nips = 0;
+ ksocknal_send_hello(ni, conn, peerid.nid, hello);
+ }
+
+ write_lock_bh(global_lock);
+ peer->ksnp_accepting--;
+ write_unlock_bh(global_lock);
+ }
+
ksocknal_txlist_done(ni, &zombies, 1);
ksocknal_peer_decref(peer);
if (list_empty (&peer->ksnp_conns)) {
/* No more connections to this peer */
+ peer->ksnp_proto = NULL; /* renegotiate protocol version */
peer->ksnp_error = error; /* stash last conn close reason */
if (list_empty (&peer->ksnp_routes)) {
peer->ksnp_accepting == 0 &&
ksocknal_find_connecting_route_locked(peer) == NULL) {
notify = 1;
- last_alive = cfs_time_current_sec() -
- cfs_duration_sec(cfs_time_current() -
- peer->ksnp_last_alive);
+ last_alive = cfs_time_seconds(peer->ksnp_last_alive);
}
read_unlock (&ksocknal_data.ksnd_global_lock);
}
int
-ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
-{
- ksock_conn_t *conn;
- struct list_head *ctmp;
- struct list_head *cnxt;
- int count = 0;
-
- list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
- conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
-
- if (conn->ksnc_incarnation == incarnation)
- continue;
-
- CDEBUG(D_NET, "Closing stale conn %s ip:%08x/%d "
- "incarnation:"LPD64"("LPD64")\n",
- libcfs_id2str(peer->ksnp_id),
- conn->ksnc_ipaddr, conn->ksnc_port,
- conn->ksnc_incarnation, incarnation);
-
- count++;
- ksocknal_close_conn_locked (conn, -ESTALE);
- }
-
- return (count);
-}
-
-int
ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
{
ksock_peer_t *peer = conn->ksnc_peer;
}
void
+ksocknal_debug_peerhash (lnet_ni_t *ni)
+{
+ ksock_peer_t *peer = NULL;
+ struct list_head *tmp;
+ int i;
+
+ read_lock (&ksocknal_data.ksnd_global_lock);
+
+ for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+ list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
+ peer = list_entry (tmp, ksock_peer_t, ksnp_list);
+
+ if (peer->ksnp_ni == ni) break;
+
+ peer = NULL;
+ }
+ }
+
+ if (peer != NULL) {
+ ksock_route_t *route;
+ ksock_conn_t *conn;
+
+ CWARN ("Active peer on shutdown: %s, ref %d, scnt %d, "
+ "closing %d, accepting %d, err %d, zcookie "LPU64", "
+ "txq %d, zc_req %d\n", libcfs_id2str(peer->ksnp_id),
+ atomic_read(&peer->ksnp_refcount),
+ peer->ksnp_sharecount, peer->ksnp_closing,
+ peer->ksnp_accepting, peer->ksnp_error,
+ peer->ksnp_zc_next_cookie,
+ !list_empty(&peer->ksnp_tx_queue),
+ !list_empty(&peer->ksnp_zc_req_list));
+
+ list_for_each (tmp, &peer->ksnp_routes) {
+ route = list_entry(tmp, ksock_route_t, ksnr_list);
+ CWARN ("Route: ref %d, schd %d, conn %d, cnted %d, "
+ "del %d\n", atomic_read(&route->ksnr_refcount),
+ route->ksnr_scheduled, route->ksnr_connecting,
+ route->ksnr_connected, route->ksnr_deleted);
+ }
+
+ list_for_each (tmp, &peer->ksnp_conns) {
+ conn = list_entry(tmp, ksock_conn_t, ksnc_list);
+ CWARN ("Conn: ref %d, sref %d, t %d, c %d\n",
+ atomic_read(&conn->ksnc_conn_refcount),
+ atomic_read(&conn->ksnc_sock_refcount),
+ conn->ksnc_type, conn->ksnc_closing);
+ }
+ }
+
+ read_unlock (&ksocknal_data.ksnd_global_lock);
+ return;
+}
+
+void
ksocknal_shutdown (lnet_ni_t *ni)
{
ksock_net_t *net = ni->ni_data;
net->ksnn_npeers);
cfs_pause(cfs_time_seconds(1));
+ ksocknal_debug_peerhash(ni);
+
spin_lock_bh (&net->ksnn_lock);
}
spin_unlock_bh (&net->ksnn_lock);