X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fralnd%2Fralnd.c;h=0da7af4faff1abd17385da2aba6937069fe7ec14;hp=02c33630e73108bf088ab9eae4939167f0bc71ed;hb=6815097cbb06aa1a727e6bf7a8ee9e916a33ee6d;hpb=0ffa249e8f3811a9f1c0c3803bdc8e6fb8435f43 diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c index 02c33630..0da7af4 100644 --- a/lnet/klnds/ralnd/ralnd.c +++ b/lnet/klnds/ralnd/ralnd.c @@ -22,13 +22,14 @@ */ #include "ranal.h" +static int kranal_devids[] = {RAPK_MAIN_DEVICE_ID, + RAPK_EXPANSION_DEVICE_ID}; nal_t kranal_api; ptl_handle_ni_t kranal_ni; kra_data_t kranal_data; kra_tunables_t kranal_tunables; -#ifdef CONFIG_SYSCTL #define RANAL_SYSCTL_TIMEOUT 1 #define RANAL_SYSCTL_LISTENER_TIMEOUT 2 #define RANAL_SYSCTL_BACKLOG 3 @@ -38,10 +39,10 @@ kra_tunables_t kranal_tunables; #define RANAL_SYSCTL 202 static ctl_table kranal_ctl_table[] = { - {RANAL_SYSCTL_TIMEOUT, "timeout", + {RANAL_SYSCTL_TIMEOUT, "timeout", &kranal_tunables.kra_timeout, sizeof(int), 0644, NULL, &proc_dointvec}, - {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", + {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", &kranal_tunables.kra_listener_timeout, sizeof(int), 0644, NULL, &proc_dointvec}, {RANAL_SYSCTL_BACKLOG, "backlog", @@ -50,7 +51,7 @@ static ctl_table kranal_ctl_table[] = { {RANAL_SYSCTL_PORT, "port", &kranal_tunables.kra_port, sizeof(int), 0644, NULL, kranal_listener_procint}, - {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", + {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", &kranal_tunables.kra_max_immediate, sizeof(int), 0644, NULL, &proc_dointvec}, { 0 } @@ -60,7 +61,6 @@ static ctl_table kranal_top_ctl_table[] = { {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table}, { 0 } }; -#endif int kranal_sock_write (struct socket *sock, void *buffer, int nob) @@ -89,6 +89,12 @@ kranal_sock_write (struct socket *sock, void *buffer, int nob) rc = sock_sendmsg(sock, &msg, iov.iov_len); set_fs(oldmm); + if (rc == nob) + return 0; + + if (rc >= 0) + return -EAGAIN; + return rc; } @@ -208,7 +214,7 @@ kranal_pause(int ticks) } void -kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) +kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, ptl_nid_t dstnid) { RAP_RETURN rrc; @@ -217,7 +223,8 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) connreq->racr_magic = RANAL_MSG_MAGIC; connreq->racr_version = RANAL_MSG_VERSION; connreq->racr_devid = conn->rac_device->rad_id; - connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_srcnid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_dstnid = dstnid; connreq->racr_peerstamp = kranal_data.kra_peerstamp; connreq->racr_connstamp = conn->rac_my_connstamp; connreq->racr_timeout = conn->rac_timeout; @@ -246,13 +253,14 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) __swab32s(&connreq->racr_magic); __swab16s(&connreq->racr_version); __swab16s(&connreq->racr_devid); - __swab64s(&connreq->racr_nid); + __swab64s(&connreq->racr_srcnid); + __swab64s(&connreq->racr_dstnid); __swab64s(&connreq->racr_peerstamp); __swab64s(&connreq->racr_connstamp); __swab32s(&connreq->racr_timeout); + __swab32s(&connreq->racr_riparams.HostId); __swab32s(&connreq->racr_riparams.FmaDomainHndl); - __swab32s(&connreq->racr_riparams.RcvCqHndl); __swab32s(&connreq->racr_riparams.PTag); __swab32s(&connreq->racr_riparams.CompletionCookie); } @@ -262,7 +270,8 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) return -EPROTO; } - if (connreq->racr_nid == PTL_NID_ANY) { + if (connreq->racr_srcnid == PTL_NID_ANY || + connreq->racr_dstnid == PTL_NID_ANY) { CERROR("Received PTL_NID_ANY\n"); return -EPROTO; } @@ -272,7 +281,7 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) connreq->racr_timeout, RANAL_MIN_TIMEOUT); return -EPROTO; } - + return 0; } @@ -305,16 +314,16 @@ kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn) if (conn->rac_device != newconn->rac_device) continue; - + if (loopback && newconn->rac_my_connstamp == conn->rac_peer_connstamp && newconn->rac_peer_connstamp == conn->rac_my_connstamp) continue; - + LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp); CDEBUG(D_NET, "Closing stale conn nid:"LPX64 - " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, + " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, conn->rac_peer_connstamp, newconn->rac_peer_connstamp); count++; @@ -332,7 +341,7 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn) int loopback; loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid; - + list_for_each(tmp, &peer->rap_conns) { conn = list_entry(tmp, kra_conn_t, rac_list); @@ -358,12 +367,12 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn) /* 'newconn' is an earlier connection from 'peer'!!! */ if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp) return 2; - + /* 'conn' is an earlier connection from 'peer': it will be * removed when we cull stale conns later on... */ if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp) continue; - + /* 'newconn' has the SAME connection stamp; 'peer' isn't * playing the game... */ return 3; @@ -384,7 +393,6 @@ kranal_set_conn_uniqueness (kra_conn_t *conn) do { /* allocate a unique cqid */ conn->rac_cqid = kranal_data.kra_next_cqid++; } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL); - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); } @@ -413,12 +421,11 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev) kranal_set_conn_uniqueness(conn); + conn->rac_device = dev; conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT); kranal_update_reaper_timeout(conn->rac_timeout); rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid, - dev->rad_ptag, - dev->rad_rdma_cq, dev->rad_fma_cq, &conn->rac_rihandle); if (rrc != RAP_SUCCESS) { CERROR("RapkCreateRi failed: %d\n", rrc); @@ -432,7 +439,7 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev) } void -kranal_destroy_conn(kra_conn_t *conn) +kranal_destroy_conn(kra_conn_t *conn) { RAP_RETURN rrc; @@ -496,9 +503,11 @@ kranal_close_conn_locked (kra_conn_t *conn, int error) /* Non-persistent peer with no more conns... */ kranal_unlink_peer_locked(peer); } - + /* Reset RX timeout to ensure we wait for an incoming CLOSE for the - * full timeout */ + * full timeout. If we get a CLOSE we know the peer has stopped all + * RDMA. Otherwise if we wait for the full timeout we can also be sure + * all RDMA has stopped. */ conn->rac_last_rx = jiffies; mb(); @@ -512,29 +521,46 @@ void kranal_close_conn (kra_conn_t *conn, int error) { unsigned long flags; - + write_lock_irqsave(&kranal_data.kra_global_lock, flags); - + if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, error); - + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); } int -kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, +kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, __u32 peer_ip, int peer_port) { - RAP_RETURN rrc; - + kra_device_t *dev = conn->rac_device; + unsigned long flags; + RAP_RETURN rrc; + + /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive + * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */ + conn->rac_last_tx = jiffies; + conn->rac_keepalive = 0; + + /* Schedule conn on rad_new_conns */ + kranal_conn_addref(conn); + spin_lock_irqsave(&dev->rad_lock, flags); + list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns); + wake_up(&dev->rad_waitq); + spin_unlock_irqrestore(&dev->rad_lock, flags); + rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams); if (rrc != RAP_SUCCESS) { - CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", + CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rrc); - return -EPROTO; + return -ECONNABORTED; } - + + /* Scheduler doesn't touch conn apart from to deschedule and decref it + * after RapkCompleteSync() return success, so conn is all mine */ + conn->rac_peerstamp = connreq->racr_peerstamp; conn->rac_peer_connstamp = connreq->racr_connstamp; conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout); @@ -543,14 +569,14 @@ kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, } int -kranal_passive_conn_handshake (struct socket *sock, - ptl_nid_t *peer_nidp, kra_conn_t **connp) +kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, + ptl_nid_t *dst_nidp, kra_conn_t **connp) { struct sockaddr_in addr; __u32 peer_ip; unsigned int peer_port; - kra_connreq_t connreq; - ptl_nid_t peer_nid; + kra_connreq_t rx_connreq; + kra_connreq_t tx_connreq; kra_conn_t *conn; kra_device_t *dev; int rc; @@ -573,25 +599,22 @@ kranal_passive_conn_handshake (struct socket *sock, return -ECONNREFUSED; } - rc = kranal_recv_connreq(sock, &connreq, + rc = kranal_recv_connreq(sock, &rx_connreq, kranal_tunables.kra_listener_timeout); if (rc != 0) { - CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rc); return rc; } - peer_nid = connreq.racr_nid; - LASSERT (peer_nid != PTL_NID_ANY); - for (i = 0;;i++) { if (i == kranal_data.kra_ndevs) { CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n", - connreq.racr_devid, HIPQUAD(peer_ip), peer_port); + rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port); return -ENODEV; } dev = &kranal_data.kra_devices[i]; - if (dev->rad_id == connreq.racr_devid) + if (dev->rad_id == rx_connreq.racr_devid) break; } @@ -599,24 +622,25 @@ kranal_passive_conn_handshake (struct socket *sock, if (rc != 0) return rc; - rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port); + kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid); + + rc = kranal_sock_write(sock, &tx_connreq, sizeof(tx_connreq)); if (rc != 0) { + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer_ip), peer_port, rc); kranal_conn_decref(conn); return rc; } - kranal_pack_connreq(&connreq, conn); - - rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); + rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port); if (rc != 0) { - CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", - HIPQUAD(peer_ip), peer_port, rc); kranal_conn_decref(conn); return rc; } *connp = conn; - *peer_nidp = peer_nid; + *src_nidp = rx_connreq.racr_srcnid; + *dst_nidp = rx_connreq.racr_dstnid; return 0; } @@ -631,8 +655,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) for (port = 1023; port >= 512; port--) { - memset(&locaddr, 0, sizeof(locaddr)); - locaddr.sin_family = AF_INET; + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; locaddr.sin_port = htons(port); locaddr.sin_addr.s_addr = htonl(INADDR_ANY); @@ -649,7 +673,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) (struct sockaddr *)&locaddr, sizeof(locaddr)); if (rc != 0) { sock_release(sock); - + if (rc == -EADDRINUSE) { CDEBUG(D_NET, "Port %d already in use\n", port); continue; @@ -666,7 +690,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) *sockp = sock; return 0; } - + sock_release(sock); if (rc != -EADDRNOTAVAIL) { @@ -674,8 +698,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) port, HIPQUAD(peer->rap_ip), peer->rap_port, rc); return rc; } - - CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", + + CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", port, HIPQUAD(peer->rap_ip), peer->rap_port); } @@ -685,7 +709,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) int -kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) +kranal_active_conn_handshake(kra_peer_t *peer, + ptl_nid_t *dst_nidp, kra_conn_t **connp) { kra_connreq_t connreq; kra_conn_t *conn; @@ -703,8 +728,8 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) if (rc != 0) return rc; - kranal_pack_connreq(&connreq, conn); - + kranal_pack_connreq(&connreq, conn, peer->rap_nid); + rc = ranal_connect_sock(peer, &sock); if (rc != 0) goto failed_0; @@ -715,14 +740,14 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); if (rc != 0) { - CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_1; } rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout); if (rc != 0) { - CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_1; } @@ -730,28 +755,29 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) sock_release(sock); rc = -EPROTO; - if (connreq.racr_nid != peer->rap_nid) { - CERROR("Unexpected nid from %u.%u.%u.%u/%d: " + if (connreq.racr_srcnid != peer->rap_nid) { + CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: " "received "LPX64" expected "LPX64"\n", - HIPQUAD(peer->rap_ip), peer->rap_port, - connreq.racr_nid, peer->rap_nid); + HIPQUAD(peer->rap_ip), peer->rap_port, + connreq.racr_srcnid, peer->rap_nid); goto failed_0; } if (connreq.racr_devid != dev->rad_id) { CERROR("Unexpected device id from %u.%u.%u.%u/%d: " "received %d expected %d\n", - HIPQUAD(peer->rap_ip), peer->rap_port, + HIPQUAD(peer->rap_ip), peer->rap_port, connreq.racr_devid, dev->rad_id); goto failed_0; } - rc = kranal_set_conn_params(conn, &connreq, + rc = kranal_set_conn_params(conn, &connreq, peer->rap_ip, peer->rap_port); if (rc != 0) goto failed_0; *connp = conn; + *dst_nidp = connreq.racr_dstnid; return 0; failed_1: @@ -767,17 +793,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) kra_peer_t *peer2; kra_tx_t *tx; ptl_nid_t peer_nid; + ptl_nid_t dst_nid; unsigned long flags; kra_conn_t *conn; int rc; int nstale; + int new_peer = 0; if (sock == NULL) { /* active: connd wants to connect to 'peer' */ LASSERT (peer != NULL); LASSERT (peer->rap_connecting); - - rc = kranal_active_conn_handshake(peer, &conn); + + rc = kranal_active_conn_handshake(peer, &dst_nid, &conn); if (rc != 0) return rc; @@ -785,19 +813,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) if (!kranal_peer_active(peer)) { /* raced with peer getting unlinked */ - write_unlock_irqrestore(&kranal_data.kra_global_lock, + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); - return ESTALE; + return -ESTALE; } peer_nid = peer->rap_nid; - } else { /* passive: listener accepted 'sock' */ LASSERT (peer == NULL); - rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn); + rc = kranal_passive_conn_handshake(sock, &peer_nid, + &dst_nid, &conn); if (rc != 0) return rc; @@ -813,26 +841,32 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) peer2 = kranal_find_peer_locked(peer_nid); if (peer2 == NULL) { - /* peer table takes my initial ref on peer */ - list_add_tail(&peer->rap_list, - kranal_nid2peerlist(peer_nid)); + new_peer = 1; } else { /* peer_nid already in the peer table */ kranal_peer_decref(peer); peer = peer2; } - /* NB I may now have a non-persistent peer in the peer - * table with no connections: I can't drop the global lock - * until I've given it a connection or removed it, and when - * I do 'peer' can disappear under me. */ } - LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */ + LASSERT ((!new_peer) != (!kranal_peer_active(peer))); + + /* Refuse connection if peer thinks we are a different NID. We check + * this while holding the global lock, to synch with connection + * destruction on NID change. */ + if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) { + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + CERROR("Stale/bad connection with "LPX64 + ": dst_nid "LPX64", expected "LPX64"\n", + peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid); + rc = -ESTALE; + goto failed; + } - /* Refuse to duplicate an existing connection (both sides might try - * to connect at once). NB we return success! We _do_ have a - * connection (so we don't need to remove the peer from the peer - * table) and we _don't_ have any blocked txs to complete */ + /* Refuse to duplicate an existing connection (both sides might try to + * connect at once). NB we return success! We _are_ connected so we + * _don't_ have any blocked txs to complete with failure. */ rc = kranal_conn_isdup_locked(peer, conn); if (rc != 0) { LASSERT (!list_empty(&peer->rap_conns)); @@ -840,10 +874,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); CWARN("Not creating duplicate connection to "LPX64": %d\n", peer_nid, rc); - kranal_conn_decref(conn); - return 0; + rc = 0; + goto failed; } + if (new_peer) { + /* peer table takes my ref on the new peer */ + list_add_tail(&peer->rap_list, + kranal_nid2peerlist(peer_nid)); + } + + /* initialise timestamps before reaper looks at them */ + conn->rac_last_tx = conn->rac_last_rx = jiffies; + kranal_peer_addref(peer); /* +1 ref for conn */ conn->rac_peer = peer; list_add_tail(&conn->rac_list, &peer->rap_conns); @@ -854,7 +897,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) /* Schedule all packets blocking for a connection */ while (!list_empty(&peer->rap_tx_queue)) { - tx = list_entry(&peer->rap_tx_queue.next, + tx = list_entry(peer->rap_tx_queue.next, kra_tx_t, tx_list); list_del(&tx->tx_list); @@ -870,10 +913,19 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) if (nstale != 0) CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid); + CDEBUG(D_WARNING, "New connection to "LPX64" on devid[%d] = %d\n", + peer_nid, conn->rac_device->rad_idx, conn->rac_device->rad_id); + /* Ensure conn gets checked. Transmits may have been queued and an * FMA event may have happened before it got in the cq hash table */ kranal_schedule_conn(conn); return 0; + + failed: + if (new_peer) + kranal_peer_decref(peer); + kranal_conn_decref(conn); + return rc; } void @@ -886,8 +938,12 @@ kranal_connect (kra_peer_t *peer) LASSERT (peer->rap_connecting); + CDEBUG(D_NET, "About to handshake "LPX64"\n", peer->rap_nid); + rc = kranal_conn_handshake(NULL, peer); + CDEBUG(D_NET, "Done handshake "LPX64":%d \n", peer->rap_nid, rc); + write_lock_irqsave(&kranal_data.kra_global_lock, flags); LASSERT (peer->rap_connecting); @@ -900,14 +956,14 @@ kranal_connect (kra_peer_t *peer) /* reset reconnection timeouts */ peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; - peer->rap_reconnect_time = CURRENT_TIME; + peer->rap_reconnect_time = CURRENT_SECONDS; write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); return; } LASSERT (peer->rap_reconnect_interval != 0); - peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval; + peer->rap_reconnect_time = CURRENT_SECONDS + peer->rap_reconnect_interval; peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL, 1 * peer->rap_reconnect_interval); @@ -932,17 +988,24 @@ kranal_connect (kra_peer_t *peer) } while (!list_empty(&zombies)); } +void +kranal_free_acceptsock (kra_acceptsock_t *ras) +{ + sock_release(ras->ras_sock); + PORTAL_FREE(ras, sizeof(*ras)); +} + int -kranal_listener(void *arg) +kranal_listener (void *arg) { struct sockaddr_in addr; wait_queue_t wait; struct socket *sock; - struct socket *newsock; + kra_acceptsock_t *ras; int port; - kra_connreq_t *connreqs; char name[16]; int rc; + unsigned long flags; /* Parent thread holds kra_nid_mutex, and is, or is about to * block on kra_listener_signal */ @@ -954,14 +1017,9 @@ kranal_listener(void *arg) init_waitqueue_entry(&wait, current); - rc = -ENOMEM; - PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs)); - if (connreqs == NULL) - goto out_0; - rc = kranal_create_sock(&sock); if (rc != 0) - goto out_1; + goto out_0; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -971,14 +1029,14 @@ kranal_listener(void *arg) rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr)); if (rc != 0) { CERROR("Can't bind to port %d\n", port); - goto out_2; + goto out_1; } rc = sock->ops->listen(sock, kranal_tunables.kra_backlog); if (rc != 0) { - CERROR("Can't set listen backlog %d: %d\n", + CERROR("Can't set listen backlog %d: %d\n", kranal_tunables.kra_backlog, rc); - goto out_2; + goto out_1; } LASSERT (kranal_data.kra_listener_sock == NULL); @@ -990,48 +1048,76 @@ kranal_listener(void *arg) /* Wake me any time something happens on my socket */ add_wait_queue(sock->sk->sk_sleep, &wait); + ras = NULL; while (kranal_data.kra_listener_shutdown == 0) { - newsock = sock_alloc(); - if (newsock == NULL) { - CERROR("Can't allocate new socket for accept\n"); - kranal_pause(HZ); - continue; + if (ras == NULL) { + PORTAL_ALLOC(ras, sizeof(*ras)); + if (ras == NULL) { + CERROR("Out of Memory: pausing...\n"); + kranal_pause(HZ); + continue; + } + ras->ras_sock = NULL; + } + + if (ras->ras_sock == NULL) { + ras->ras_sock = sock_alloc(); + if (ras->ras_sock == NULL) { + CERROR("Can't allocate socket: pausing...\n"); + kranal_pause(HZ); + continue; + } + /* XXX this should add a ref to sock->ops->owner, if + * TCP could be a module */ + ras->ras_sock->type = sock->type; + ras->ras_sock->ops = sock->ops; } set_current_state(TASK_INTERRUPTIBLE); - rc = sock->ops->accept(sock, newsock, O_NONBLOCK); + rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK); + /* Sleep for socket activity? */ if (rc == -EAGAIN && kranal_data.kra_listener_shutdown == 0) schedule(); set_current_state(TASK_RUNNING); - if (rc != 0) { - sock_release(newsock); - if (rc != -EAGAIN) { - CERROR("Accept failed: %d\n", rc); - kranal_pause(HZ); - } + if (rc == 0) { + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + + list_add_tail(&ras->ras_list, + &kranal_data.kra_connd_acceptq); + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + wake_up(&kranal_data.kra_connd_waitq); + + ras = NULL; continue; - } + } - kranal_conn_handshake(newsock, NULL); - sock_release(newsock); + if (rc != -EAGAIN) { + CERROR("Accept failed: %d, pausing...\n", rc); + kranal_pause(HZ); + } + } + + if (ras != NULL) { + if (ras->ras_sock != NULL) + sock_release(ras->ras_sock); + PORTAL_FREE(ras, sizeof(*ras)); } rc = 0; remove_wait_queue(sock->sk->sk_sleep, &wait); - out_2: + out_1: sock_release(sock); kranal_data.kra_listener_sock = NULL; - out_1: - PORTAL_FREE(connreqs, 2 * sizeof(*connreqs)); out_0: - /* set completion status and unblock thread waiting for me + /* set completion status and unblock thread waiting for me * (parent on startup failure, executioner on normal shutdown) */ kranal_data.kra_listener_shutdown = rc; up(&kranal_data.kra_listener_signal); @@ -1045,7 +1131,7 @@ kranal_start_listener (void) long pid; int rc; - CDEBUG(D_WARNING, "Starting listener\n"); + CDEBUG(D_NET, "Starting listener\n"); /* Called holding kra_nid_mutex: listener stopped */ LASSERT (kranal_data.kra_listener_sock == NULL); @@ -1063,14 +1149,18 @@ kranal_start_listener (void) rc = kranal_data.kra_listener_shutdown; LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL)); - CDEBUG(D_WARNING, "Listener %ld started OK\n", pid); + CDEBUG(D_NET, "Listener %ld started OK\n", pid); return rc; } void -kranal_stop_listener(void) +kranal_stop_listener(int clear_acceptq) { - CDEBUG(D_WARNING, "Stopping listener\n"); + struct list_head zombie_accepts; + unsigned long flags; + kra_acceptsock_t *ras; + + CDEBUG(D_NET, "Stopping listener\n"); /* Called holding kra_nid_mutex: listener running */ LASSERT (kranal_data.kra_listener_sock != NULL); @@ -1082,10 +1172,28 @@ kranal_stop_listener(void) down(&kranal_data.kra_listener_signal); LASSERT (kranal_data.kra_listener_sock == NULL); - CDEBUG(D_WARNING, "Listener stopped\n"); + CDEBUG(D_NET, "Listener stopped\n"); + + if (!clear_acceptq) + return; + + /* Close any unhandled accepts */ + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + + list_add(&zombie_accepts, &kranal_data.kra_connd_acceptq); + list_del_init(&kranal_data.kra_connd_acceptq); + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + while (!list_empty(&zombie_accepts)) { + ras = list_entry(zombie_accepts.next, + kra_acceptsock_t, ras_list); + list_del(&ras->ras_list); + kranal_free_acceptsock(ras); + } } -int +int kranal_listener_procint(ctl_table *table, int write, struct file *filp, void *buffer, size_t *lenp) { @@ -1110,7 +1218,7 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp, kranal_data.kra_listener_sock == NULL)) { if (kranal_data.kra_listener_sock != NULL) - kranal_stop_listener(); + kranal_stop_listener(0); rc = kranal_start_listener(); @@ -1131,9 +1239,9 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp, int kranal_set_mynid(ptl_nid_t nid) { - unsigned long flags; - lib_ni_t *ni = &kranal_lib.libnal_ni; - int rc = 0; + unsigned long flags; + lib_ni_t *ni = &kranal_lib.libnal_ni; + int rc = 0; CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n", nid, ni->ni_pid.nid); @@ -1147,13 +1255,12 @@ kranal_set_mynid(ptl_nid_t nid) } if (kranal_data.kra_listener_sock != NULL) - kranal_stop_listener(); + kranal_stop_listener(1); write_lock_irqsave(&kranal_data.kra_global_lock, flags); kranal_data.kra_peerstamp++; - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); - ni->ni_pid.nid = nid; + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); /* Delete all existing peers and their connections after new * NID/connstamp set to ensure no old connections in our brave @@ -1188,7 +1295,7 @@ kranal_create_peer (ptl_nid_t nid) INIT_LIST_HEAD(&peer->rap_conns); INIT_LIST_HEAD(&peer->rap_tx_queue); - peer->rap_reconnect_time = CURRENT_TIME; + peer->rap_reconnect_time = CURRENT_SECONDS; peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; atomic_inc(&kranal_data.kra_npeers); @@ -1269,7 +1376,7 @@ kranal_unlink_peer_locked (kra_peer_t *peer) } int -kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, +kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, int *persistencep) { kra_peer_t *peer; @@ -1537,7 +1644,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private) break; } case NAL_CMD_DEL_PEER: { - rc = kranal_del_peer(pcfg->pcfg_nid, + rc = kranal_del_peer(pcfg->pcfg_nid, /* flags == single_share */ pcfg->pcfg_flags != 0); break; @@ -1611,7 +1718,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n) PORTAL_ALLOC(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys)); if (tx->tx_phys == NULL) { - CERROR("Can't allocate %stx[%d]->tx_phys\n", + CERROR("Can't allocate %stx[%d]->tx_phys\n", isnblk ? "nblk " : "", i); PORTAL_FREE(tx, sizeof(*tx)); @@ -1650,36 +1757,26 @@ kranal_device_init(int id, kra_device_t *dev) goto failed_1; } - rrc = RapkCreatePtag(dev->rad_handle, - &dev->rad_ptag); - if (rrc != RAP_SUCCESS) { - CERROR("Can't create ptag" - " for device %d: %d\n", id, rrc); - goto failed_1; - } - - rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag, - &dev->rad_rdma_cq); + rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND, + &dev->rad_rdma_cqh); if (rrc != RAP_SUCCESS) { CERROR("Can't create rdma cq size %d" " for device %d: %d\n", total_ntx, id, rrc); - goto failed_2; + goto failed_1; } - rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, - dev->rad_ptag, &dev->rad_fma_cq); + rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV, + &dev->rad_fma_cqh); if (rrc != RAP_SUCCESS) { CERROR("Can't create fma cq size %d" " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc); - goto failed_3; + goto failed_2; } return 0; - failed_3: - RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); failed_2: - RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh); failed_1: RapkReleaseDevice(dev->rad_handle); failed_0: @@ -1690,9 +1787,8 @@ void kranal_device_fini(kra_device_t *dev) { LASSERT(dev->rad_scheduler == NULL); - RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag); - RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); - RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh); RapkReleaseDevice(dev->rad_handle); } @@ -1701,7 +1797,7 @@ kranal_api_shutdown (nal_t *nal) { int i; unsigned long flags; - + if (nal->nal_refct != 0) { /* This module got the first ref */ PORTAL_MODULE_UNUSE; @@ -1749,13 +1845,19 @@ kranal_api_shutdown (nal_t *nal) break; } + /* Conn/Peer state all cleaned up BEFORE setting shutdown, so threads + * don't have to worry about shutdown races */ + LASSERT (atomic_read(&kranal_data.kra_nconns) == 0); + LASSERT (atomic_read(&kranal_data.kra_npeers) == 0); + /* flag threads to terminate; wake and wait for them to die */ kranal_data.kra_shutdown = 1; for (i = 0; i < kranal_data.kra_ndevs; i++) { kra_device_t *dev = &kranal_data.kra_devices[i]; - LASSERT (list_empty(&dev->rad_connq)); + LASSERT (list_empty(&dev->rad_ready_conns)); + LASSERT (list_empty(&dev->rad_new_conns)); spin_lock_irqsave(&dev->rad_lock, flags); wake_up(&dev->rad_waitq); @@ -1767,9 +1869,9 @@ kranal_api_shutdown (nal_t *nal) spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); LASSERT (list_empty(&kranal_data.kra_connd_peers)); - spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); wake_up_all(&kranal_data.kra_connd_waitq); - spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); i = 2; while (atomic_read(&kranal_data.kra_nthreads) != 0) { @@ -1786,7 +1888,7 @@ kranal_api_shutdown (nal_t *nal) LASSERT (list_empty(&kranal_data.kra_peers[i])); PORTAL_FREE(kranal_data.kra_peers, - sizeof (struct list_head) * + sizeof (struct list_head) * kranal_data.kra_peer_hash_size); } @@ -1796,7 +1898,7 @@ kranal_api_shutdown (nal_t *nal) LASSERT (list_empty(&kranal_data.kra_conns[i])); PORTAL_FREE(kranal_data.kra_conns, - sizeof (struct list_head) * + sizeof (struct list_head) * kranal_data.kra_conn_hash_size); } @@ -1819,8 +1921,6 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, ptl_ni_limits_t *requested_limits, ptl_ni_limits_t *actual_limits) { - static int device_ids[] = {RAPK_MAIN_DEVICE_ID, - RAPK_EXPANSION_DEVICE_ID}; struct timeval tv; ptl_process_id_t process_id; int pkmem = atomic_read(&portal_kmemory); @@ -1861,7 +1961,8 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, kra_device_t *dev = &kranal_data.kra_devices[i]; dev->rad_idx = i; - INIT_LIST_HEAD(&dev->rad_connq); + INIT_LIST_HEAD(&dev->rad_ready_conns); + INIT_LIST_HEAD(&dev->rad_new_conns); init_waitqueue_head(&dev->rad_waitq); spin_lock_init(&dev->rad_lock); } @@ -1870,6 +1971,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, init_waitqueue_head(&kranal_data.kra_reaper_waitq); spin_lock_init(&kranal_data.kra_reaper_lock); + INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq); INIT_LIST_HEAD(&kranal_data.kra_connd_peers); init_waitqueue_head(&kranal_data.kra_connd_waitq); spin_lock_init(&kranal_data.kra_connd_lock); @@ -1881,7 +1983,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, /* OK to call kranal_api_shutdown() to cleanup now */ kranal_data.kra_init = RANAL_INIT_DATA; - + kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE; PORTAL_ALLOC(kranal_data.kra_peers, sizeof(struct list_head) * kranal_data.kra_peer_hash_size); @@ -1929,7 +2031,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, } for (i = 0; i < RANAL_N_CONND; i++) { - rc = kranal_thread_start(kranal_connd, (void *)i); + rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i); if (rc != 0) { CERROR("Can't spawn ranal connd[%d]: %d\n", i, rc); @@ -1937,14 +2039,25 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, } } - LASSERT(kranal_data.kra_ndevs == 0); - for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) { + LASSERT (kranal_data.kra_ndevs == 0); + + for (i = 0; i < sizeof(kranal_devids)/sizeof(kranal_devids[0]); i++) { + LASSERT (i < RANAL_MAXDEVS); + dev = &kranal_data.kra_devices[kranal_data.kra_ndevs]; - rc = kranal_device_init(device_ids[i], dev); + rc = kranal_device_init(kranal_devids[i], dev); if (rc == 0) kranal_data.kra_ndevs++; - + } + + if (kranal_data.kra_ndevs == 0) { + CERROR("Can't initialise any RapidArray devices\n"); + goto failed; + } + + for (i = 0; i < kranal_data.kra_ndevs; i++) { + dev = &kranal_data.kra_devices[i]; rc = kranal_thread_start(kranal_scheduler, dev); if (rc != 0) { CERROR("Can't spawn ranal scheduler[%d]: %d\n", @@ -1953,9 +2066,6 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, } } - if (kranal_data.kra_ndevs == 0) - goto failed; - rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL); if (rc != 0) { CERROR("Can't initialise command interface (rc = %d)\n", rc); @@ -1973,17 +2083,16 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, return PTL_OK; failed: - kranal_api_shutdown(&kranal_api); + kranal_api_shutdown(&kranal_api); return PTL_FAIL; } void __exit kranal_module_fini (void) { -#ifdef CONFIG_SYSCTL if (kranal_tunables.kra_sysctl != NULL) unregister_sysctl_table(kranal_tunables.kra_sysctl); -#endif + PtlNIFini(kranal_ni); ptl_unregister_nal(RANAL); @@ -2007,6 +2116,10 @@ kranal_module_init (void) /* Initialise dynamic tunables to defaults once only */ kranal_tunables.kra_timeout = RANAL_TIMEOUT; + kranal_tunables.kra_listener_timeout = RANAL_LISTENER_TIMEOUT; + kranal_tunables.kra_backlog = RANAL_BACKLOG; + kranal_tunables.kra_port = RANAL_PORT; + kranal_tunables.kra_max_immediate = RANAL_MAX_IMMEDIATE; rc = ptl_register_nal(RANAL, &kranal_api); if (rc != PTL_OK) { @@ -2021,11 +2134,15 @@ kranal_module_init (void) return -ENODEV; } -#ifdef CONFIG_SYSCTL - /* Press on regardless even if registering sysctl doesn't work */ - kranal_tunables.kra_sysctl = + kranal_tunables.kra_sysctl = register_sysctl_table(kranal_top_ctl_table, 0); -#endif + if (kranal_tunables.kra_sysctl == NULL) { + CERROR("Can't register sysctl table\n"); + PtlNIFini(kranal_ni); + ptl_unregister_nal(RANAL); + return -ENOMEM; + } + return 0; }