X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fralnd%2Fralnd.c;h=0da7af4faff1abd17385da2aba6937069fe7ec14;hp=c92482761c04c97ed4b3f7ee418046f7cb7e055d;hb=6815097cbb06aa1a727e6bf7a8ee9e916a33ee6d;hpb=6015bc0f2476bc7ad45283d599a519113102664f diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c index c924827..0da7af4 100644 --- a/lnet/klnds/ralnd/ralnd.c +++ b/lnet/klnds/ralnd/ralnd.c @@ -22,13 +22,14 @@ */ #include "ranal.h" +static int kranal_devids[] = {RAPK_MAIN_DEVICE_ID, + RAPK_EXPANSION_DEVICE_ID}; nal_t kranal_api; ptl_handle_ni_t kranal_ni; kra_data_t kranal_data; kra_tunables_t kranal_tunables; -#ifdef CONFIG_SYSCTL #define RANAL_SYSCTL_TIMEOUT 1 #define RANAL_SYSCTL_LISTENER_TIMEOUT 2 #define RANAL_SYSCTL_BACKLOG 3 @@ -38,19 +39,19 @@ kra_tunables_t kranal_tunables; #define RANAL_SYSCTL 202 static ctl_table kranal_ctl_table[] = { - {RANAL_SYSCTL_TIMEOUT, "timeout", + {RANAL_SYSCTL_TIMEOUT, "timeout", &kranal_tunables.kra_timeout, sizeof(int), 0644, NULL, &proc_dointvec}, - {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", + {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", &kranal_tunables.kra_listener_timeout, sizeof(int), 0644, NULL, &proc_dointvec}, - {RANAL_SYSCTL_BACKLOG, "backlog", - &kranal_tunables.kra_backlog, sizeof(int), - 0644, NULL, kranal_listener_procint}, - {RANAL_SYSCTL_PORT, "port", - &kranal_tunables.kra_port, sizeof(int), - 0644, NULL, kranal_listener_procint}, - {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", + {RANAL_SYSCTL_BACKLOG, "backlog", + &kranal_tunables.kra_backlog, sizeof(int), + 0644, NULL, kranal_listener_procint}, + {RANAL_SYSCTL_PORT, "port", + &kranal_tunables.kra_port, sizeof(int), + 0644, NULL, kranal_listener_procint}, + {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", &kranal_tunables.kra_max_immediate, sizeof(int), 0644, NULL, &proc_dointvec}, { 0 } @@ -60,36 +61,41 @@ static ctl_table kranal_top_ctl_table[] = { {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table}, { 0 } }; -#endif int kranal_sock_write (struct socket *sock, void *buffer, int nob) { int rc; mm_segment_t oldmm = get_fs(); - struct iovec iov = { - .iov_base = buffer, - .iov_len = nob - }; - struct msghdr msg = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = &iov, - .msg_iovlen = 1, - .msg_control = NULL, - .msg_controllen = 0, - .msg_flags = MSG_DONTWAIT - }; - - /* We've set up the socket's send buffer to be large enough for - * everything we send, so a single non-blocking send should - * complete without error. */ - - set_fs(KERNEL_DS); - rc = sock_sendmsg(sock, &msg, iov.iov_len); - set_fs(oldmm); - - return rc; + struct iovec iov = { + .iov_base = buffer, + .iov_len = nob + }; + struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = MSG_DONTWAIT + }; + + /* We've set up the socket's send buffer to be large enough for + * everything we send, so a single non-blocking send should + * complete without error. */ + + set_fs(KERNEL_DS); + rc = sock_sendmsg(sock, &msg, iov.iov_len); + set_fs(oldmm); + + if (rc == nob) + return 0; + + if (rc >= 0) + return -EAGAIN; + + return rc; } int @@ -97,12 +103,12 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) { int rc; mm_segment_t oldmm = get_fs(); - long ticks = timeout * HZ; - unsigned long then; - struct timeval tv; + long ticks = timeout * HZ; + unsigned long then; + struct timeval tv; - LASSERT (nob > 0); - LASSERT (ticks > 0); + LASSERT (nob > 0); + LASSERT (ticks > 0); for (;;) { struct iovec iov = { @@ -119,25 +125,25 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) .msg_flags = 0 }; - /* Set receive timeout to remaining time */ - tv = (struct timeval) { - .tv_sec = ticks / HZ, - .tv_usec = ((ticks % HZ) * 1000000) / HZ - }; - set_fs(KERNEL_DS); - rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, - (char *)&tv, sizeof(tv)); - set_fs(oldmm); - if (rc != 0) { - CERROR("Can't set socket recv timeout %d: %d\n", - timeout, rc); - return rc; - } + /* Set receive timeout to remaining time */ + tv = (struct timeval) { + .tv_sec = ticks / HZ, + .tv_usec = ((ticks % HZ) * 1000000) / HZ + }; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, + (char *)&tv, sizeof(tv)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set socket recv timeout %d: %d\n", + timeout, rc); + return rc; + } set_fs(KERNEL_DS); - then = jiffies; + then = jiffies; rc = sock_recvmsg(sock, &msg, iov.iov_len, 0); - ticks -= jiffies - then; + ticks -= jiffies - then; set_fs(oldmm); if (rc < 0) @@ -149,78 +155,79 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) buffer = ((char *)buffer) + rc; nob -= rc; - if (nob == 0) - return 0; + if (nob == 0) + return 0; - if (ticks <= 0) - return -ETIMEDOUT; + if (ticks <= 0) + return -ETIMEDOUT; } } int kranal_create_sock(struct socket **sockp) { - struct socket *sock; - int rc; - struct timeval tv; - int option; + struct socket *sock; + int rc; + int option; mm_segment_t oldmm = get_fs(); - rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock); - if (rc != 0) { - CERROR("Can't create socket: %d\n", rc); - return rc; - } - - /* Ensure sending connection info doesn't block */ - option = 2 * sizeof(kra_connreq_t); - set_fs(KERNEL_DS); - rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, - (char *)&option, sizeof(option)); - set_fs(oldmm); - if (rc != 0) { - CERROR("Can't set send buffer %d: %d\n", option, rc); - goto failed; - } - - option = 1; - set_fs(KERNEL_DS); - rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&option, sizeof(option)); - set_fs(oldmm); - if (rc != 0) { - CERROR("Can't set SO_REUSEADDR: %d\n", rc); - goto failed; - } - - *sockp = sock; - return 0; + rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock); + if (rc != 0) { + CERROR("Can't create socket: %d\n", rc); + return rc; + } + + /* Ensure sending connection info doesn't block */ + option = 2 * sizeof(kra_connreq_t); + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set send buffer %d: %d\n", option, rc); + goto failed; + } + + option = 1; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set SO_REUSEADDR: %d\n", rc); + goto failed; + } + + *sockp = sock; + return 0; failed: - sock_release(sock); - return rc; + sock_release(sock); + return rc; } void kranal_pause(int ticks) { - set_current_state(TASK_UNINTERRUPTIBLE); - schedule_timeout(ticks); + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(ticks); } void -kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) +kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, ptl_nid_t dstnid) { RAP_RETURN rrc; memset(connreq, 0, sizeof(*connreq)); - connreq->racr_magic = RANAL_MSG_MAGIC; - connreq->racr_version = RANAL_MSG_VERSION; - connreq->racr_devid = conn->rac_device->rad_id; - connreq->racr_nid = kranal_lib.libnal_ni.ni_pid.nid; - connreq->racr_timeout = conn->rac_timeout; - connreq->racr_incarnation = conn->rac_my_incarnation; + connreq->racr_magic = RANAL_MSG_MAGIC; + connreq->racr_version = RANAL_MSG_VERSION; + connreq->racr_devid = conn->rac_device->rad_id; + connreq->racr_srcnid = kranal_lib.libnal_ni.ni_pid.nid; + connreq->racr_dstnid = dstnid; + connreq->racr_peerstamp = kranal_data.kra_peerstamp; + connreq->racr_connstamp = conn->rac_my_connstamp; + connreq->racr_timeout = conn->rac_timeout; rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams); LASSERT(rrc == RAP_SUCCESS); @@ -229,40 +236,42 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn) int kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) { - int i; - int rc; - - rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout); - if (rc != 0) { - CERROR("Read failed: %d\n", rc); - return rc; - } - - if (connreq->racr_magic != RANAL_MSG_MAGIC) { - if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) { - CERROR("Unexpected magic %08x\n", connreq->racr_magic); - return -EPROTO; - } - - __swab32s(&connreq->racr_magic); - __swab16s(&connreq->racr_version); + int rc; + + rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout); + if (rc != 0) { + CERROR("Read failed: %d\n", rc); + return rc; + } + + if (connreq->racr_magic != RANAL_MSG_MAGIC) { + if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) { + CERROR("Unexpected magic %08x\n", connreq->racr_magic); + return -EPROTO; + } + + __swab32s(&connreq->racr_magic); + __swab16s(&connreq->racr_version); __swab16s(&connreq->racr_devid); - __swab64s(&connreq->racr_nid); - __swab64s(&connreq->racr_incarnation); + __swab64s(&connreq->racr_srcnid); + __swab64s(&connreq->racr_dstnid); + __swab64s(&connreq->racr_peerstamp); + __swab64s(&connreq->racr_connstamp); __swab32s(&connreq->racr_timeout); - __swab32s(&connreq->racr_riparams.FmaDomainHndl); - __swab32s(&connreq->racr_riparams.RcvCqHndl); - __swab32s(&connreq->racr_riparams.PTag); + __swab32s(&connreq->racr_riparams.HostId); + __swab32s(&connreq->racr_riparams.FmaDomainHndl); + __swab32s(&connreq->racr_riparams.PTag); __swab32s(&connreq->racr_riparams.CompletionCookie); - } + } - if (connreq->racr_version != RANAL_MSG_VERSION) { - CERROR("Unexpected version %d\n", connreq->racr_version); - return -EPROTO; - } + if (connreq->racr_version != RANAL_MSG_VERSION) { + CERROR("Unexpected version %d\n", connreq->racr_version); + return -EPROTO; + } - if (connreq->racr_nid == PTL_NID_ANY) { + if (connreq->racr_srcnid == PTL_NID_ANY || + connreq->racr_dstnid == PTL_NID_ANY) { CERROR("Received PTL_NID_ANY\n"); return -EPROTO; } @@ -272,47 +281,104 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) connreq->racr_timeout, RANAL_MIN_TIMEOUT); return -EPROTO; } - - for (i = 0; i < kranal_data.kra_ndevs; i++) - if (connreq->racr_devid == - kranal_data.kra_devices[i].rad_id) - break; - if (i == kranal_data.kra_ndevs) { - CERROR("Can't match device %d\n", connreq->racr_devid); - return -ENODEV; + return 0; +} + +int +kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn) +{ + kra_conn_t *conn; + struct list_head *ctmp; + struct list_head *cnxt; + int loopback; + int count = 0; + + loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid; + + list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { + conn = list_entry(ctmp, kra_conn_t, rac_list); + + if (conn == newconn) + continue; + + if (conn->rac_peerstamp != newconn->rac_peerstamp) { + CDEBUG(D_NET, "Closing stale conn nid:"LPX64 + " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid, + conn->rac_peerstamp, newconn->rac_peerstamp); + LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp); + count++; + kranal_close_conn_locked(conn, -ESTALE); + continue; + } + + if (conn->rac_device != newconn->rac_device) + continue; + + if (loopback && + newconn->rac_my_connstamp == conn->rac_peer_connstamp && + newconn->rac_peer_connstamp == conn->rac_my_connstamp) + continue; + + LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp); + + CDEBUG(D_NET, "Closing stale conn nid:"LPX64 + " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, + conn->rac_peer_connstamp, newconn->rac_peer_connstamp); + + count++; + kranal_close_conn_locked(conn, -ESTALE); } - return 0; + return count; } int -kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation) +kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn) { - kra_conn_t *conn; - struct list_head *tmp; - int loopback = 0; + kra_conn_t *conn; + struct list_head *tmp; + int loopback; + + loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid; + + list_for_each(tmp, &peer->rap_conns) { + conn = list_entry(tmp, kra_conn_t, rac_list); - list_for_each(tmp, &peer->rap_conns) { - conn = list_entry(tmp, kra_conn_t, rac_list); + /* 'newconn' is from an earlier version of 'peer'!!! */ + if (newconn->rac_peerstamp < conn->rac_peerstamp) + return 1; - if (conn->rac_peer_incarnation < incarnation) { - /* Conns with an older incarnation get culled later */ + /* 'conn' is from an earlier version of 'peer': it will be + * removed when we cull stale conns later on... */ + if (newconn->rac_peerstamp > conn->rac_peerstamp) continue; - } - if (!loopback && - conn->rac_peer_incarnation == incarnation && - peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) { - /* loopback creates 2 conns */ - loopback = 1; + /* Different devices are OK */ + if (conn->rac_device != newconn->rac_device) + continue; + + /* It's me connecting to myself */ + if (loopback && + newconn->rac_my_connstamp == conn->rac_peer_connstamp && + newconn->rac_peer_connstamp == conn->rac_my_connstamp) continue; - } - return 1; - } + /* 'newconn' is an earlier connection from 'peer'!!! */ + if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp) + return 2; - return 0; + /* 'conn' is an earlier connection from 'peer': it will be + * removed when we cull stale conns later on... */ + if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp) + continue; + + /* 'newconn' has the SAME connection stamp; 'peer' isn't + * playing the game... */ + return 3; + } + + return 0; } void @@ -322,45 +388,44 @@ kranal_set_conn_uniqueness (kra_conn_t *conn) write_lock_irqsave(&kranal_data.kra_global_lock, flags); - conn->rac_my_incarnation = kranal_data.kra_next_incarnation++; + conn->rac_my_connstamp = kranal_data.kra_connstamp++; do { /* allocate a unique cqid */ conn->rac_cqid = kranal_data.kra_next_cqid++; } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL); - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); } int -kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev) +kranal_create_conn(kra_conn_t **connp, kra_device_t *dev) { - kra_conn_t *conn; + kra_conn_t *conn; RAP_RETURN rrc; LASSERT (!in_interrupt()); - PORTAL_ALLOC(conn, sizeof(*conn)); + PORTAL_ALLOC(conn, sizeof(*conn)); - if (conn == NULL) - return -ENOMEM; + if (conn == NULL) + return -ENOMEM; - memset(conn, 0, sizeof(*conn)); - atomic_set(&conn->rac_refcount, 1); - INIT_LIST_HEAD(&conn->rac_list); - INIT_LIST_HEAD(&conn->rac_hashlist); - INIT_LIST_HEAD(&conn->rac_fmaq); - INIT_LIST_HEAD(&conn->rac_rdmaq); - INIT_LIST_HEAD(&conn->rac_replyq); - spin_lock_init(&conn->rac_lock); + memset(conn, 0, sizeof(*conn)); + atomic_set(&conn->rac_refcount, 1); + INIT_LIST_HEAD(&conn->rac_list); + INIT_LIST_HEAD(&conn->rac_hashlist); + INIT_LIST_HEAD(&conn->rac_schedlist); + INIT_LIST_HEAD(&conn->rac_fmaq); + INIT_LIST_HEAD(&conn->rac_rdmaq); + INIT_LIST_HEAD(&conn->rac_replyq); + spin_lock_init(&conn->rac_lock); kranal_set_conn_uniqueness(conn); + conn->rac_device = dev; conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT); kranal_update_reaper_timeout(conn->rac_timeout); rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid, - dev->rad_ptag, - dev->rad_rdma_cq, dev->rad_fma_cq, &conn->rac_rihandle); if (rrc != RAP_SUCCESS) { CERROR("RapkCreateRi failed: %d\n", rrc); @@ -369,39 +434,25 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev) } atomic_inc(&kranal_data.kra_nconns); - *connp = conn; - return 0; + *connp = conn; + return 0; } void -__kranal_conn_decref(kra_conn_t *conn) +kranal_destroy_conn(kra_conn_t *conn) { - kra_tx_t *tx; RAP_RETURN rrc; LASSERT (!in_interrupt()); LASSERT (!conn->rac_scheduled); LASSERT (list_empty(&conn->rac_list)); LASSERT (list_empty(&conn->rac_hashlist)); + LASSERT (list_empty(&conn->rac_schedlist)); LASSERT (atomic_read(&conn->rac_refcount) == 0); - - while (!list_empty(&conn->rac_fmaq)) { - tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); - - list_del(&tx->tx_list); - kranal_tx_done(tx, -ECONNABORTED); - } - - /* We may not destroy this connection while it has RDMAs outstanding */ + LASSERT (list_empty(&conn->rac_fmaq)); LASSERT (list_empty(&conn->rac_rdmaq)); + LASSERT (list_empty(&conn->rac_replyq)); - while (!list_empty(&conn->rac_replyq)) { - tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); - - list_del(&tx->tx_list); - kranal_tx_done(tx, -ECONNABORTED); - } - rrc = RapkDestroyRi(conn->rac_device->rad_handle, conn->rac_rihandle); LASSERT (rrc == RAP_SUCCESS); @@ -409,25 +460,27 @@ __kranal_conn_decref(kra_conn_t *conn) if (conn->rac_peer != NULL) kranal_peer_decref(conn->rac_peer); - PORTAL_FREE(conn, sizeof(*conn)); + PORTAL_FREE(conn, sizeof(*conn)); atomic_dec(&kranal_data.kra_nconns); } void kranal_terminate_conn_locked (kra_conn_t *conn) { - kra_peer_t *peer = conn->rac_peer; - LASSERT (!in_interrupt()); - LASSERT (conn->rac_closing); + LASSERT (conn->rac_state == RANAL_CONN_CLOSING); LASSERT (!list_empty(&conn->rac_hashlist)); LASSERT (list_empty(&conn->rac_list)); - /* Remove from conn hash table (no new callbacks) */ + /* Remove from conn hash table: no new callbacks */ list_del_init(&conn->rac_hashlist); kranal_conn_decref(conn); - /* Conn is now just waiting for remaining refs to go */ + conn->rac_state = RANAL_CONN_CLOSED; + + /* schedule to clear out all uncompleted comms in context of dev's + * scheduler */ + kranal_schedule_conn(conn); } void @@ -436,10 +489,10 @@ kranal_close_conn_locked (kra_conn_t *conn, int error) kra_peer_t *peer = conn->rac_peer; CDEBUG(error == 0 ? D_NET : D_ERROR, - "closing conn to "LPX64": error %d\n", peer->rap_nid, error); + "closing conn to "LPX64": error %d\n", peer->rap_nid, error); LASSERT (!in_interrupt()); - LASSERT (!conn->rac_closing); + LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED); LASSERT (!list_empty(&conn->rac_hashlist)); LASSERT (!list_empty(&conn->rac_list)); @@ -451,8 +504,15 @@ kranal_close_conn_locked (kra_conn_t *conn, int error) kranal_unlink_peer_locked(peer); } - conn->rac_closing = 1; - kranal_schedule_conn(conn); + /* Reset RX timeout to ensure we wait for an incoming CLOSE for the + * full timeout. If we get a CLOSE we know the peer has stopped all + * RDMA. Otherwise if we wait for the full timeout we can also be sure + * all RDMA has stopped. */ + conn->rac_last_rx = jiffies; + mb(); + + conn->rac_state = RANAL_CONN_CLOSING; + kranal_schedule_conn(conn); /* schedule sending CLOSE */ kranal_conn_decref(conn); /* lose peer's ref */ } @@ -461,34 +521,70 @@ void kranal_close_conn (kra_conn_t *conn, int error) { unsigned long flags; - + write_lock_irqsave(&kranal_data.kra_global_lock, flags); - - if (!conn->rac_closing) + + if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, error); - + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); } int -kranal_passive_conn_handshake (struct socket *sock, - ptl_nid_t *peer_nidp, kra_conn_t **connp) +kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, + __u32 peer_ip, int peer_port) { - struct sockaddr_in addr; - __u32 peer_ip; + kra_device_t *dev = conn->rac_device; + unsigned long flags; + RAP_RETURN rrc; + + /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive + * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */ + conn->rac_last_tx = jiffies; + conn->rac_keepalive = 0; + + /* Schedule conn on rad_new_conns */ + kranal_conn_addref(conn); + spin_lock_irqsave(&dev->rad_lock, flags); + list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns); + wake_up(&dev->rad_waitq); + spin_unlock_irqrestore(&dev->rad_lock, flags); + + rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams); + if (rrc != RAP_SUCCESS) { + CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer_ip), peer_port, rrc); + return -ECONNABORTED; + } + + /* Scheduler doesn't touch conn apart from to deschedule and decref it + * after RapkCompleteSync() return success, so conn is all mine */ + + conn->rac_peerstamp = connreq->racr_peerstamp; + conn->rac_peer_connstamp = connreq->racr_connstamp; + conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout); + kranal_update_reaper_timeout(conn->rac_keepalive); + return 0; +} + +int +kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, + ptl_nid_t *dst_nidp, kra_conn_t **connp) +{ + struct sockaddr_in addr; + __u32 peer_ip; unsigned int peer_port; - kra_connreq_t connreq; - ptl_nid_t peer_nid; + kra_connreq_t rx_connreq; + kra_connreq_t tx_connreq; kra_conn_t *conn; kra_device_t *dev; - RAP_RETURN rrc; - int rc; + int rc; int len; int i; len = sizeof(addr); - rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2); + rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2); if (rc != 0) { CERROR("Can't get peer's IP: %d\n", rc); return rc; @@ -503,51 +599,48 @@ kranal_passive_conn_handshake (struct socket *sock, return -ECONNREFUSED; } - rc = kranal_recv_connreq(sock, &connreq, + rc = kranal_recv_connreq(sock, &rx_connreq, kranal_tunables.kra_listener_timeout); if (rc != 0) { - CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rc); return rc; } - peer_nid = connreq.racr_nid; - LASSERT (peer_nid != PTL_NID_ANY); - for (i = 0;;i++) { - LASSERT(i < kranal_data.kra_ndevs); + if (i == kranal_data.kra_ndevs) { + CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n", + rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port); + return -ENODEV; + } dev = &kranal_data.kra_devices[i]; - if (dev->rad_id == connreq.racr_devid) + if (dev->rad_id == rx_connreq.racr_devid) break; } - rc = kranal_alloc_conn(&conn, dev); + rc = kranal_create_conn(&conn, dev); if (rc != 0) return rc; - conn->rac_peer_incarnation = connreq.racr_incarnation; - conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); - kranal_update_reaper_timeout(conn->rac_keepalive); - - rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams); - if (rrc != RAP_SUCCESS) { - CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc); + kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid); + + rc = kranal_sock_write(sock, &tx_connreq, sizeof(tx_connreq)); + if (rc != 0) { + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + HIPQUAD(peer_ip), peer_port, rc); kranal_conn_decref(conn); - return -EPROTO; + return rc; } - kranal_pack_connreq(&connreq, conn); - - rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); + rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port); if (rc != 0) { - CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", - HIPQUAD(peer_ip), peer_port, rc); kranal_conn_decref(conn); return rc; } *connp = conn; - *peer_nidp = peer_nid; + *src_nidp = rx_connreq.racr_srcnid; + *dst_nidp = rx_connreq.racr_dstnid; return 0; } @@ -559,14 +652,11 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) struct socket *sock; unsigned int port; int rc; - int option; - mm_segment_t oldmm = get_fs(); - struct timeval tv; for (port = 1023; port >= 512; port--) { - memset(&locaddr, 0, sizeof(locaddr)); - locaddr.sin_family = AF_INET; + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; locaddr.sin_port = htons(port); locaddr.sin_addr.s_addr = htonl(INADDR_ANY); @@ -583,7 +673,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) (struct sockaddr *)&locaddr, sizeof(locaddr)); if (rc != 0) { sock_release(sock); - + if (rc == -EADDRINUSE) { CDEBUG(D_NET, "Port %d already in use\n", port); continue; @@ -600,7 +690,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) *sockp = sock; return 0; } - + sock_release(sock); if (rc != -EADDRNOTAVAIL) { @@ -608,8 +698,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) port, HIPQUAD(peer->rap_ip), peer->rap_port, rc); return rc; } - - CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", + + CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", port, HIPQUAD(peer->rap_ip), peer->rap_port); } @@ -619,26 +709,27 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) int -kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) +kranal_active_conn_handshake(kra_peer_t *peer, + ptl_nid_t *dst_nidp, kra_conn_t **connp) { - struct sockaddr_in dstaddr; - kra_connreq_t connreq; + kra_connreq_t connreq; kra_conn_t *conn; kra_device_t *dev; struct socket *sock; - RAP_RETURN rrc; - int rc; - int idx; - - idx = peer->rap_nid & 0x7fffffff; + int rc; + unsigned int idx; + + /* spread connections over all devices using both peer NIDs to ensure + * all nids use all devices */ + idx = peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid; dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs]; - rc = kranal_alloc_conn(&conn, dev); + rc = kranal_create_conn(&conn, dev); if (rc != 0) return rc; - kranal_pack_connreq(&connreq, conn); - + kranal_pack_connreq(&connreq, conn, peer->rap_nid); + rc = ranal_connect_sock(peer, &sock); if (rc != 0) goto failed_0; @@ -649,14 +740,14 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); if (rc != 0) { - CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_1; } rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout); if (rc != 0) { - CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_1; } @@ -664,36 +755,30 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp) sock_release(sock); rc = -EPROTO; - if (connreq.racr_nid != peer->rap_nid) { - CERROR("Unexpected nid from %u.%u.%u.%u/%d: " + if (connreq.racr_srcnid != peer->rap_nid) { + CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: " "received "LPX64" expected "LPX64"\n", - HIPQUAD(peer->rap_ip), peer->rap_port, - connreq.racr_nid, peer->rap_nid); + HIPQUAD(peer->rap_ip), peer->rap_port, + connreq.racr_srcnid, peer->rap_nid); goto failed_0; } if (connreq.racr_devid != dev->rad_id) { CERROR("Unexpected device id from %u.%u.%u.%u/%d: " "received %d expected %d\n", - HIPQUAD(peer->rap_ip), peer->rap_port, + HIPQUAD(peer->rap_ip), peer->rap_port, connreq.racr_devid, dev->rad_id); goto failed_0; } - conn->rac_peer_incarnation = connreq.racr_incarnation; - conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout); - kranal_update_reaper_timeout(conn->rac_keepalive); - - rc = -ENETDOWN; - rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams); - if (rrc != RAP_SUCCESS) { - CERROR("Can't set riparams for "LPX64": %d\n", - peer->rap_nid, rrc); + rc = kranal_set_conn_params(conn, &connreq, + peer->rap_ip, peer->rap_port); + if (rc != 0) goto failed_0; - } *connp = conn; - return 0; + *dst_nidp = connreq.racr_dstnid; + return 0; failed_1: sock_release(sock); @@ -707,85 +792,104 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) { kra_peer_t *peer2; kra_tx_t *tx; - ptl_nid_t peer_nid; - unsigned long flags; - unsigned long timeout; - kra_conn_t *conn; - int rc; + ptl_nid_t peer_nid; + ptl_nid_t dst_nid; + unsigned long flags; + kra_conn_t *conn; + int rc; int nstale; + int new_peer = 0; - if (sock != NULL) { - /* passive: listener accepted sock */ - LASSERT (peer == NULL); + if (sock == NULL) { + /* active: connd wants to connect to 'peer' */ + LASSERT (peer != NULL); + LASSERT (peer->rap_connecting); - rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn); + rc = kranal_active_conn_handshake(peer, &dst_nid, &conn); if (rc != 0) return rc; - /* assume this is a new peer */ - peer = kranal_create_peer(peer_nid); - if (peer == NULL) { - CERROR("Can't allocate peer for "LPX64"\n", peer_nid); + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + if (!kranal_peer_active(peer)) { + /* raced with peer getting unlinked */ + write_unlock_irqrestore(&kranal_data.kra_global_lock, + flags); kranal_conn_decref(conn); - return -ENOMEM; - } - - write_lock_irqsave(&kranal_data.kra_global_lock, flags); - - peer2 = kranal_find_peer_locked(peer_nid); - if (peer2 == NULL) { - /* peer table takes my initial ref on peer */ - list_add_tail(&peer->rap_list, - kranal_nid2peerlist(peer_nid)); - } else { - /* peer_nid already in the peer table */ - kranal_peer_decref(peer); - peer = peer2; - } - /* NB I may now have a non-persistent peer in the peer - * table with no connections: I can't drop the global lock - * until I've given it a connection or removed it, and when - * I do 'peer' can disappear under me. */ + return -ESTALE; + } + + peer_nid = peer->rap_nid; } else { - /* active: connd wants to connect to peer */ - LASSERT (peer != NULL); - LASSERT (peer->rap_connecting); - - rc = kranal_active_conn_handshake(peer, &conn); + /* passive: listener accepted 'sock' */ + LASSERT (peer == NULL); + + rc = kranal_passive_conn_handshake(sock, &peer_nid, + &dst_nid, &conn); if (rc != 0) return rc; - write_lock_irqsave(&kranal_data.kra_global_lock, flags); - - if (!kranal_peer_active(peer)) { - /* raced with peer getting unlinked */ - write_unlock_irqrestore(&kranal_data.kra_global_lock, - flags); + /* assume this is a new peer */ + peer = kranal_create_peer(peer_nid); + if (peer == NULL) { + CERROR("Can't allocate peer for "LPX64"\n", peer_nid); kranal_conn_decref(conn); - return ESTALE; - } - } - - LASSERT (kranal_peer_active(peer)); /* peer is in the peer table */ - peer_nid = peer->rap_nid; - - /* Refuse to duplicate an existing connection (both sides might try - * to connect at once). NB we return success! We _do_ have a - * connection (so we don't need to remove the peer from the peer - * table) and we _don't_ have any blocked txs to complete */ - if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) { + return -ENOMEM; + } + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + peer2 = kranal_find_peer_locked(peer_nid); + if (peer2 == NULL) { + new_peer = 1; + } else { + /* peer_nid already in the peer table */ + kranal_peer_decref(peer); + peer = peer2; + } + } + + LASSERT ((!new_peer) != (!kranal_peer_active(peer))); + + /* Refuse connection if peer thinks we are a different NID. We check + * this while holding the global lock, to synch with connection + * destruction on NID change. */ + if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) { + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + CERROR("Stale/bad connection with "LPX64 + ": dst_nid "LPX64", expected "LPX64"\n", + peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid); + rc = -ESTALE; + goto failed; + } + + /* Refuse to duplicate an existing connection (both sides might try to + * connect at once). NB we return success! We _are_ connected so we + * _don't_ have any blocked txs to complete with failure. */ + rc = kranal_conn_isdup_locked(peer, conn); + if (rc != 0) { LASSERT (!list_empty(&peer->rap_conns)); LASSERT (list_empty(&peer->rap_tx_queue)); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); - CWARN("Not creating duplicate connection to "LPX64"\n", - peer_nid); - kranal_conn_decref(conn); - return 0; - } + CWARN("Not creating duplicate connection to "LPX64": %d\n", + peer_nid, rc); + rc = 0; + goto failed; + } + + if (new_peer) { + /* peer table takes my ref on the new peer */ + list_add_tail(&peer->rap_list, + kranal_nid2peerlist(peer_nid)); + } + + /* initialise timestamps before reaper looks at them */ + conn->rac_last_tx = conn->rac_last_rx = jiffies; - kranal_peer_addref(peer); /* +1 ref for conn */ - conn->rac_peer = peer; - list_add_tail(&conn->rac_list, &peer->rap_conns); + kranal_peer_addref(peer); /* +1 ref for conn */ + conn->rac_peer = peer; + list_add_tail(&conn->rac_list, &peer->rap_conns); kranal_conn_addref(conn); /* +1 ref for conn table */ list_add_tail(&conn->rac_hashlist, @@ -793,26 +897,35 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) /* Schedule all packets blocking for a connection */ while (!list_empty(&peer->rap_tx_queue)) { - tx = list_entry(&peer->rap_tx_queue.next, + tx = list_entry(peer->rap_tx_queue.next, kra_tx_t, tx_list); list_del(&tx->tx_list); kranal_post_fma(conn, tx); } - nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation); + nstale = kranal_close_stale_conns_locked(peer, conn); - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); /* CAVEAT EMPTOR: passive peer can disappear NOW */ if (nstale != 0) CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid); + CDEBUG(D_WARNING, "New connection to "LPX64" on devid[%d] = %d\n", + peer_nid, conn->rac_device->rad_idx, conn->rac_device->rad_id); + /* Ensure conn gets checked. Transmits may have been queued and an * FMA event may have happened before it got in the cq hash table */ kranal_schedule_conn(conn); - return 0; + return 0; + + failed: + if (new_peer) + kranal_peer_decref(peer); + kranal_conn_decref(conn); + return rc; } void @@ -825,8 +938,12 @@ kranal_connect (kra_peer_t *peer) LASSERT (peer->rap_connecting); + CDEBUG(D_NET, "About to handshake "LPX64"\n", peer->rap_nid); + rc = kranal_conn_handshake(NULL, peer); + CDEBUG(D_NET, "Done handshake "LPX64":%d \n", peer->rap_nid, rc); + write_lock_irqsave(&kranal_data.kra_global_lock, flags); LASSERT (peer->rap_connecting); @@ -839,14 +956,14 @@ kranal_connect (kra_peer_t *peer) /* reset reconnection timeouts */ peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; - peer->rap_reconnect_time = CURRENT_TIME; + peer->rap_reconnect_time = CURRENT_SECONDS; write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); return; } LASSERT (peer->rap_reconnect_interval != 0); - peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval; + peer->rap_reconnect_time = CURRENT_SECONDS + peer->rap_reconnect_interval; peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL, 1 * peer->rap_reconnect_interval); @@ -871,199 +988,260 @@ kranal_connect (kra_peer_t *peer) } while (!list_empty(&zombies)); } +void +kranal_free_acceptsock (kra_acceptsock_t *ras) +{ + sock_release(ras->ras_sock); + PORTAL_FREE(ras, sizeof(*ras)); +} + int -kranal_listener(void *arg) +kranal_listener (void *arg) { - struct sockaddr_in addr; - wait_queue_t wait; - struct socket *sock; - struct socket *newsock; - int port; - kra_connreq_t *connreqs; - char name[16]; + struct sockaddr_in addr; + wait_queue_t wait; + struct socket *sock; + kra_acceptsock_t *ras; + int port; + char name[16]; int rc; + unsigned long flags; - /* Parent thread holds kra_nid_mutex, and is, or is about to - * block on kra_listener_signal */ - - port = kranal_tunables.kra_port; - snprintf(name, sizeof(name), "kranal_lstn%03d", port); - kportal_daemonize(name); - kportal_blockallsigs(); + /* Parent thread holds kra_nid_mutex, and is, or is about to + * block on kra_listener_signal */ - init_waitqueue_entry(&wait, current); + port = kranal_tunables.kra_port; + snprintf(name, sizeof(name), "kranal_lstn%03d", port); + kportal_daemonize(name); + kportal_blockallsigs(); - rc = -ENOMEM; - PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs)); - if (connreqs == NULL) - goto out_0; + init_waitqueue_entry(&wait, current); - rc = kranal_create_sock(&sock); - if (rc != 0) - goto out_1; + rc = kranal_create_sock(&sock); + if (rc != 0) + goto out_0; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; - rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr)); - if (rc != 0) { - CERROR("Can't bind to port %d\n", port); - goto out_2; - } + rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr)); + if (rc != 0) { + CERROR("Can't bind to port %d\n", port); + goto out_1; + } - rc = sock->ops->listen(sock, kranal_tunables.kra_backlog); - if (rc != 0) { - CERROR("Can't set listen backlog %d: %d\n", + rc = sock->ops->listen(sock, kranal_tunables.kra_backlog); + if (rc != 0) { + CERROR("Can't set listen backlog %d: %d\n", kranal_tunables.kra_backlog, rc); - goto out_2; - } + goto out_1; + } + + LASSERT (kranal_data.kra_listener_sock == NULL); + kranal_data.kra_listener_sock = sock; + + /* unblock waiting parent */ + LASSERT (kranal_data.kra_listener_shutdown == 0); + up(&kranal_data.kra_listener_signal); - LASSERT (kranal_data.kra_listener_sock == NULL); - kranal_data.kra_listener_sock = sock; + /* Wake me any time something happens on my socket */ + add_wait_queue(sock->sk->sk_sleep, &wait); + ras = NULL; - /* unblock waiting parent */ - LASSERT (kranal_data.kra_listener_shutdown == 0); - up(&kranal_data.kra_listener_signal); + while (kranal_data.kra_listener_shutdown == 0) { - /* Wake me any time something happens on my socket */ - add_wait_queue(sock->sk->sk_sleep, &wait); + if (ras == NULL) { + PORTAL_ALLOC(ras, sizeof(*ras)); + if (ras == NULL) { + CERROR("Out of Memory: pausing...\n"); + kranal_pause(HZ); + continue; + } + ras->ras_sock = NULL; + } + + if (ras->ras_sock == NULL) { + ras->ras_sock = sock_alloc(); + if (ras->ras_sock == NULL) { + CERROR("Can't allocate socket: pausing...\n"); + kranal_pause(HZ); + continue; + } + /* XXX this should add a ref to sock->ops->owner, if + * TCP could be a module */ + ras->ras_sock->type = sock->type; + ras->ras_sock->ops = sock->ops; + } - while (kranal_data.kra_listener_shutdown == 0) { + set_current_state(TASK_INTERRUPTIBLE); - newsock = sock_alloc(); - if (newsock == NULL) { - CERROR("Can't allocate new socket for accept\n"); - kranal_pause(HZ); - continue; - } + rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK); - set_current_state(TASK_INTERRUPTIBLE); + /* Sleep for socket activity? */ + if (rc == -EAGAIN && + kranal_data.kra_listener_shutdown == 0) + schedule(); - rc = sock->ops->accept(sock, newsock, O_NONBLOCK); + set_current_state(TASK_RUNNING); - if (rc == -EAGAIN && - kranal_data.kra_listener_shutdown == 0) - schedule(); + if (rc == 0) { + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); - set_current_state(TASK_RUNNING); + list_add_tail(&ras->ras_list, + &kranal_data.kra_connd_acceptq); - if (rc != 0) { - sock_release(newsock); - if (rc != -EAGAIN) { - CERROR("Accept failed: %d\n", rc); - kranal_pause(HZ); - } - continue; - } + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + wake_up(&kranal_data.kra_connd_waitq); - kranal_conn_handshake(newsock, NULL); - sock_release(newsock); - } + ras = NULL; + continue; + } - rc = 0; - remove_wait_queue(sock->sk->sk_sleep, &wait); - out_2: - sock_release(sock); - kranal_data.kra_listener_sock = NULL; + if (rc != -EAGAIN) { + CERROR("Accept failed: %d, pausing...\n", rc); + kranal_pause(HZ); + } + } + + if (ras != NULL) { + if (ras->ras_sock != NULL) + sock_release(ras->ras_sock); + PORTAL_FREE(ras, sizeof(*ras)); + } + + rc = 0; + remove_wait_queue(sock->sk->sk_sleep, &wait); out_1: - PORTAL_FREE(connreqs, 2 * sizeof(*connreqs)); + sock_release(sock); + kranal_data.kra_listener_sock = NULL; out_0: - /* set completion status and unblock thread waiting for me - * (parent on startup failure, executioner on normal shutdown) */ - kranal_data.kra_listener_shutdown = rc; - up(&kranal_data.kra_listener_signal); + /* set completion status and unblock thread waiting for me + * (parent on startup failure, executioner on normal shutdown) */ + kranal_data.kra_listener_shutdown = rc; + up(&kranal_data.kra_listener_signal); - return 0; + return 0; } int kranal_start_listener (void) { - long pid; - int rc; + long pid; + int rc; - CDEBUG(D_WARNING, "Starting listener\n"); + CDEBUG(D_NET, "Starting listener\n"); - /* Called holding kra_nid_mutex: listener stopped */ - LASSERT (kranal_data.kra_listener_sock == NULL); + /* Called holding kra_nid_mutex: listener stopped */ + LASSERT (kranal_data.kra_listener_sock == NULL); - kranal_data.kra_listener_shutdown == 0; - pid = kernel_thread(kranal_listener, NULL, 0); - if (pid < 0) { - CERROR("Can't spawn listener: %ld\n", pid); - return (int)pid; - } + kranal_data.kra_listener_shutdown = 0; + pid = kernel_thread(kranal_listener, NULL, 0); + if (pid < 0) { + CERROR("Can't spawn listener: %ld\n", pid); + return (int)pid; + } - /* Block until listener has started up. */ - down(&kranal_data.kra_listener_signal); + /* Block until listener has started up. */ + down(&kranal_data.kra_listener_signal); - rc = kranal_data.kra_listener_shutdown; - LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL)); + rc = kranal_data.kra_listener_shutdown; + LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL)); - CDEBUG(D_WARNING, "Listener %ld started OK\n", pid); - return rc; + CDEBUG(D_NET, "Listener %ld started OK\n", pid); + return rc; } void -kranal_stop_listener(void) +kranal_stop_listener(int clear_acceptq) { - CDEBUG(D_WARNING, "Stopping listener\n"); + struct list_head zombie_accepts; + unsigned long flags; + kra_acceptsock_t *ras; + + CDEBUG(D_NET, "Stopping listener\n"); + + /* Called holding kra_nid_mutex: listener running */ + LASSERT (kranal_data.kra_listener_sock != NULL); + + kranal_data.kra_listener_shutdown = 1; + wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep); + + /* Block until listener has torn down. */ + down(&kranal_data.kra_listener_signal); - /* Called holding kra_nid_mutex: listener running */ - LASSERT (kranal_data.kra_listener_sock != NULL); + LASSERT (kranal_data.kra_listener_sock == NULL); + CDEBUG(D_NET, "Listener stopped\n"); - kranal_data.kra_listener_shutdown = 1; - wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep); + if (!clear_acceptq) + return; + + /* Close any unhandled accepts */ + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + + list_add(&zombie_accepts, &kranal_data.kra_connd_acceptq); + list_del_init(&kranal_data.kra_connd_acceptq); - /* Block until listener has torn down. */ - down(&kranal_data.kra_listener_signal); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); - LASSERT (kranal_data.kra_listener_sock == NULL); - CDEBUG(D_WARNING, "Listener stopped\n"); + while (!list_empty(&zombie_accepts)) { + ras = list_entry(zombie_accepts.next, + kra_acceptsock_t, ras_list); + list_del(&ras->ras_list); + kranal_free_acceptsock(ras); + } } -int +int kranal_listener_procint(ctl_table *table, int write, struct file *filp, - void *buffer, size_t *lenp) + void *buffer, size_t *lenp) { - int *tunable = (int *)table->data; - int old_val; - int rc; + int *tunable = (int *)table->data; + int old_val; + int rc; - down(&kranal_data.kra_nid_mutex); + /* No race with nal initialisation since the nal is setup all the time + * it's loaded. When that changes, change this! */ + LASSERT (kranal_data.kra_init == RANAL_INIT_ALL); - LASSERT (tunable == &kranal_tunables.kra_port || - tunable == &kranal_tunables.kra_backlog); - old_val = *tunable; + down(&kranal_data.kra_nid_mutex); - rc = proc_dointvec(table, write, filp, buffer, lenp); + LASSERT (tunable == &kranal_tunables.kra_port || + tunable == &kranal_tunables.kra_backlog); + old_val = *tunable; - if (write && - (*tunable != old_val || - kranal_data.kra_listener_sock == NULL)) { + rc = proc_dointvec(table, write, filp, buffer, lenp); - if (kranal_data.kra_listener_sock != NULL) - kranal_stop_listener(); + if (write && + (*tunable != old_val || + kranal_data.kra_listener_sock == NULL)) { - rc = kranal_start_listener(); + if (kranal_data.kra_listener_sock != NULL) + kranal_stop_listener(0); - if (rc != 0) { - *tunable = old_val; - kranal_start_listener(); - } - } + rc = kranal_start_listener(); + + if (rc != 0) { + CWARN("Unable to start listener with new tunable:" + " reverting to old value\n"); + *tunable = old_val; + kranal_start_listener(); + } + } + + up(&kranal_data.kra_nid_mutex); - up(&kranal_data.kra_nid_mutex); - return rc; + LASSERT (kranal_data.kra_init == RANAL_INIT_ALL); + return rc; } int kranal_set_mynid(ptl_nid_t nid) { - lib_ni_t *ni = &kranal_lib.libnal_ni; - int rc = 0; + unsigned long flags; + lib_ni_t *ni = &kranal_lib.libnal_ni; + int rc = 0; CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n", nid, ni->ni_pid.nid); @@ -1076,13 +1254,16 @@ kranal_set_mynid(ptl_nid_t nid) return 0; } - if (kranal_data.kra_listener_sock != NULL) - kranal_stop_listener(); + if (kranal_data.kra_listener_sock != NULL) + kranal_stop_listener(1); + write_lock_irqsave(&kranal_data.kra_global_lock, flags); + kranal_data.kra_peerstamp++; ni->ni_pid.nid = nid; + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); /* Delete all existing peers and their connections after new - * NID/incarnation set to ensure no old connections in our brave + * NID/connstamp set to ensure no old connections in our brave * new world. */ kranal_del_peer(PTL_NID_ANY, 0); @@ -1109,11 +1290,12 @@ kranal_create_peer (ptl_nid_t nid) peer->rap_nid = nid; atomic_set(&peer->rap_refcount, 1); /* 1 ref for caller */ - INIT_LIST_HEAD(&peer->rap_list); /* not in the peer table yet */ + INIT_LIST_HEAD(&peer->rap_list); + INIT_LIST_HEAD(&peer->rap_connd_list); INIT_LIST_HEAD(&peer->rap_conns); INIT_LIST_HEAD(&peer->rap_tx_queue); - peer->rap_reconnect_time = CURRENT_TIME; + peer->rap_reconnect_time = CURRENT_SECONDS; peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL; atomic_inc(&kranal_data.kra_npeers); @@ -1121,16 +1303,17 @@ kranal_create_peer (ptl_nid_t nid) } void -__kranal_peer_decref (kra_peer_t *peer) +kranal_destroy_peer (kra_peer_t *peer) { CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer); LASSERT (atomic_read(&peer->rap_refcount) == 0); LASSERT (peer->rap_persistence == 0); LASSERT (!kranal_peer_active(peer)); - LASSERT (peer->rap_connecting == 0); + LASSERT (!peer->rap_connecting); LASSERT (list_empty(&peer->rap_conns)); LASSERT (list_empty(&peer->rap_tx_queue)); + LASSERT (list_empty(&peer->rap_connd_list)); PORTAL_FREE(peer, sizeof(*peer)); @@ -1193,7 +1376,7 @@ kranal_unlink_peer_locked (kra_peer_t *peer) } int -kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, +kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, int *persistencep) { kra_peer_t *peer; @@ -1387,31 +1570,6 @@ kranal_close_peer_conns_locked (kra_peer_t *peer, int why) } int -kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation) -{ - kra_conn_t *conn; - struct list_head *ctmp; - struct list_head *cnxt; - int count = 0; - - list_for_each_safe (ctmp, cnxt, &peer->rap_conns) { - conn = list_entry(ctmp, kra_conn_t, rac_list); - - if (conn->rac_peer_incarnation == incarnation) - continue; - - CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n", - peer->rap_nid, conn->rac_peer_incarnation, incarnation); - LASSERT (conn->rac_peer_incarnation < incarnation); - - count++; - kranal_close_conn_locked(conn, -ESTALE); - } - - return count; -} - -int kranal_close_matching_conns (ptl_nid_t nid) { unsigned long flags; @@ -1486,7 +1644,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private) break; } case NAL_CMD_DEL_PEER: { - rc = kranal_del_peer(pcfg->pcfg_nid, + rc = kranal_del_peer(pcfg->pcfg_nid, /* flags == single_share */ pcfg->pcfg_flags != 0); break; @@ -1499,7 +1657,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private) else { rc = 0; pcfg->pcfg_nid = conn->rac_peer->rap_nid; - pcfg->pcfg_id = 0; + pcfg->pcfg_id = conn->rac_device->rad_id; pcfg->pcfg_misc = 0; pcfg->pcfg_flags = 0; kranal_conn_decref(conn); @@ -1560,7 +1718,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n) PORTAL_ALLOC(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys)); if (tx->tx_phys == NULL) { - CERROR("Can't allocate %stx[%d]->tx_phys\n", + CERROR("Can't allocate %stx[%d]->tx_phys\n", isnblk ? "nblk " : "", i); PORTAL_FREE(tx, sizeof(*tx)); @@ -1570,6 +1728,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n) tx->tx_isnblk = isnblk; tx->tx_buftype = RANAL_BUF_NONE; + tx->tx_msg.ram_type = RANAL_MSG_NONE; list_add(&tx->tx_list, freelist); } @@ -1598,36 +1757,26 @@ kranal_device_init(int id, kra_device_t *dev) goto failed_1; } - rrc = RapkCreatePtag(dev->rad_handle, - &dev->rad_ptag); - if (rrc != RAP_SUCCESS) { - CERROR("Can't create ptag" - " for device %d: %d\n", id, rrc); - goto failed_1; - } - - rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag, - &dev->rad_rdma_cq); + rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND, + &dev->rad_rdma_cqh); if (rrc != RAP_SUCCESS) { CERROR("Can't create rdma cq size %d" " for device %d: %d\n", total_ntx, id, rrc); - goto failed_2; + goto failed_1; } - rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, - dev->rad_ptag, &dev->rad_fma_cq); + rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV, + &dev->rad_fma_cqh); if (rrc != RAP_SUCCESS) { CERROR("Can't create fma cq size %d" " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc); - goto failed_3; + goto failed_2; } return 0; - failed_3: - RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); failed_2: - RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh); failed_1: RapkReleaseDevice(dev->rad_handle); failed_0: @@ -1637,9 +1786,9 @@ kranal_device_init(int id, kra_device_t *dev) void kranal_device_fini(kra_device_t *dev) { - RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag); - RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag); - RapkDestroyPtag(dev->rad_handle, dev->rad_ptag); + LASSERT(dev->rad_scheduler == NULL); + RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh); + RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh); RapkReleaseDevice(dev->rad_handle); } @@ -1647,9 +1796,8 @@ void kranal_api_shutdown (nal_t *nal) { int i; - int rc; unsigned long flags; - + if (nal->nal_refct != 0) { /* This module got the first ref */ PORTAL_MODULE_UNUSE; @@ -1685,7 +1833,7 @@ kranal_api_shutdown (nal_t *nal) "waiting for %d peers and %d conns to close down\n", atomic_read(&kranal_data.kra_npeers), atomic_read(&kranal_data.kra_nconns)); - kranal_pause(HZ); + kranal_pause(HZ); } /* fall through */ @@ -1697,13 +1845,19 @@ kranal_api_shutdown (nal_t *nal) break; } + /* Conn/Peer state all cleaned up BEFORE setting shutdown, so threads + * don't have to worry about shutdown races */ + LASSERT (atomic_read(&kranal_data.kra_nconns) == 0); + LASSERT (atomic_read(&kranal_data.kra_npeers) == 0); + /* flag threads to terminate; wake and wait for them to die */ kranal_data.kra_shutdown = 1; for (i = 0; i < kranal_data.kra_ndevs; i++) { kra_device_t *dev = &kranal_data.kra_devices[i]; - LASSERT (list_empty(&dev->rad_connq)); + LASSERT (list_empty(&dev->rad_ready_conns)); + LASSERT (list_empty(&dev->rad_new_conns)); spin_lock_irqsave(&dev->rad_lock, flags); wake_up(&dev->rad_waitq); @@ -1715,9 +1869,9 @@ kranal_api_shutdown (nal_t *nal) spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); LASSERT (list_empty(&kranal_data.kra_connd_peers)); - spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); wake_up_all(&kranal_data.kra_connd_waitq); - spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); i = 2; while (atomic_read(&kranal_data.kra_nthreads) != 0) { @@ -1734,7 +1888,7 @@ kranal_api_shutdown (nal_t *nal) LASSERT (list_empty(&kranal_data.kra_peers[i])); PORTAL_FREE(kranal_data.kra_peers, - sizeof (struct list_head) * + sizeof (struct list_head) * kranal_data.kra_peer_hash_size); } @@ -1744,7 +1898,7 @@ kranal_api_shutdown (nal_t *nal) LASSERT (list_empty(&kranal_data.kra_conns[i])); PORTAL_FREE(kranal_data.kra_conns, - sizeof (struct list_head) * + sizeof (struct list_head) * kranal_data.kra_conn_hash_size); } @@ -1767,8 +1921,6 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, ptl_ni_limits_t *requested_limits, ptl_ni_limits_t *actual_limits) { - static int device_ids[] = {RAPK_MAIN_DEVICE_ID, - RAPK_EXPANSION_DEVICE_ID}; struct timeval tv; ptl_process_id_t process_id; int pkmem = atomic_read(&portal_kmemory); @@ -1791,13 +1943,14 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */ /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and - * a unique (for all time) incarnation so we can uniquely identify - * the sender. The incarnation is an incrementing counter + * a unique (for all time) connstamp so we can uniquely identify + * the sender. The connstamp is an incrementing counter * initialised with seconds + microseconds at startup time. So we * rely on NOT creating connections more frequently on average than - * 1MHz to ensure we don't use old incarnations when we reboot. */ + * 1MHz to ensure we don't use old connstamps when we reboot. */ do_gettimeofday(&tv); - kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; + kranal_data.kra_connstamp = + kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; init_MUTEX(&kranal_data.kra_nid_mutex); init_MUTEX_LOCKED(&kranal_data.kra_listener_signal); @@ -1808,14 +1961,17 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, kra_device_t *dev = &kranal_data.kra_devices[i]; dev->rad_idx = i; - INIT_LIST_HEAD(&dev->rad_connq); + INIT_LIST_HEAD(&dev->rad_ready_conns); + INIT_LIST_HEAD(&dev->rad_new_conns); init_waitqueue_head(&dev->rad_waitq); spin_lock_init(&dev->rad_lock); } + kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; init_waitqueue_head(&kranal_data.kra_reaper_waitq); spin_lock_init(&kranal_data.kra_reaper_lock); + INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq); INIT_LIST_HEAD(&kranal_data.kra_connd_peers); init_waitqueue_head(&kranal_data.kra_connd_waitq); spin_lock_init(&kranal_data.kra_connd_lock); @@ -1827,7 +1983,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, /* OK to call kranal_api_shutdown() to cleanup now */ kranal_data.kra_init = RANAL_INIT_DATA; - + kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE; PORTAL_ALLOC(kranal_data.kra_peers, sizeof(struct list_head) * kranal_data.kra_peer_hash_size); @@ -1875,7 +2031,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, } for (i = 0; i < RANAL_N_CONND; i++) { - rc = kranal_thread_start(kranal_connd, (void *)i); + rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i); if (rc != 0) { CERROR("Can't spawn ranal connd[%d]: %d\n", i, rc); @@ -1883,14 +2039,25 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, } } - LASSERT(kranal_data.kra_ndevs == 0); - for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) { + LASSERT (kranal_data.kra_ndevs == 0); + + for (i = 0; i < sizeof(kranal_devids)/sizeof(kranal_devids[0]); i++) { + LASSERT (i < RANAL_MAXDEVS); + dev = &kranal_data.kra_devices[kranal_data.kra_ndevs]; - rc = kranal_device_init(device_ids[i], dev); + rc = kranal_device_init(kranal_devids[i], dev); if (rc == 0) kranal_data.kra_ndevs++; - + } + + if (kranal_data.kra_ndevs == 0) { + CERROR("Can't initialise any RapidArray devices\n"); + goto failed; + } + + for (i = 0; i < kranal_data.kra_ndevs; i++) { + dev = &kranal_data.kra_devices[i]; rc = kranal_thread_start(kranal_scheduler, dev); if (rc != 0) { CERROR("Can't spawn ranal scheduler[%d]: %d\n", @@ -1899,9 +2066,6 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, } } - if (kranal_data.kra_ndevs == 0) - goto failed; - rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL); if (rc != 0) { CERROR("Can't initialise command interface (rc = %d)\n", rc); @@ -1919,17 +2083,16 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, return PTL_OK; failed: - kranal_api_shutdown(&kranal_api); + kranal_api_shutdown(&kranal_api); return PTL_FAIL; } void __exit kranal_module_fini (void) { -#ifdef CONFIG_SYSCTL if (kranal_tunables.kra_sysctl != NULL) unregister_sysctl_table(kranal_tunables.kra_sysctl); -#endif + PtlNIFini(kranal_ni); ptl_unregister_nal(RANAL); @@ -1953,6 +2116,10 @@ kranal_module_init (void) /* Initialise dynamic tunables to defaults once only */ kranal_tunables.kra_timeout = RANAL_TIMEOUT; + kranal_tunables.kra_listener_timeout = RANAL_LISTENER_TIMEOUT; + kranal_tunables.kra_backlog = RANAL_BACKLOG; + kranal_tunables.kra_port = RANAL_PORT; + kranal_tunables.kra_max_immediate = RANAL_MAX_IMMEDIATE; rc = ptl_register_nal(RANAL, &kranal_api); if (rc != PTL_OK) { @@ -1967,11 +2134,15 @@ kranal_module_init (void) return -ENODEV; } -#ifdef CONFIG_SYSCTL - /* Press on regardless even if registering sysctl doesn't work */ - kranal_tunables.kra_sysctl = + kranal_tunables.kra_sysctl = register_sysctl_table(kranal_top_ctl_table, 0); -#endif + if (kranal_tunables.kra_sysctl == NULL) { + CERROR("Can't register sysctl table\n"); + PtlNIFini(kranal_ni); + ptl_unregister_nal(RANAL); + return -ENOMEM; + } + return 0; }